From 87415c0df6bbf5170a88f48cd80721fa5aad9888 Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Wed, 31 Jul 2024 14:46:20 +0300 Subject: [PATCH 1/5] UI: Fixed load platform in iOs 16 or later (fixed noLeadTrailSpacesRegex) --- .../home/components/widget/lib/gateway/gateway-widget.models.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/gateway/gateway-widget.models.ts b/ui-ngx/src/app/modules/home/components/widget/lib/gateway/gateway-widget.models.ts index 0e143cff36..125d14fd3a 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/gateway/gateway-widget.models.ts +++ b/ui-ngx/src/app/modules/home/components/widget/lib/gateway/gateway-widget.models.ts @@ -19,7 +19,7 @@ import { Observable } from 'rxjs'; import { ValueTypeData } from '@shared/models/constants'; import { Validators } from '@angular/forms'; -export const noLeadTrailSpacesRegex: RegExp = /^(?! )[\S\s]*(? Date: Thu, 1 Aug 2024 11:52:43 +0300 Subject: [PATCH 2/5] removed dataformat xml exclusion --- ...appingJackson2XmlHttpMessageConverter.java | 64 +++++++++++++++++++ .../system/RestTemplateConvertersTest.java | 36 ----------- pom.xml | 4 -- 3 files changed, 64 insertions(+), 40 deletions(-) create mode 100644 application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java delete mode 100644 application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java diff --git a/application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java b/application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java new file mode 100644 index 0000000000..0074526ebc --- /dev/null +++ b/application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.http.converter.xml; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import org.springframework.http.MediaType; +import org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.util.Assert; + +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +public class MappingJackson2XmlHttpMessageConverter extends AbstractJackson2HttpMessageConverter { + private static final List problemDetailMediaTypes; + + public MappingJackson2XmlHttpMessageConverter() { + this(Jackson2ObjectMapperBuilder.xml().build()); + } + + public MappingJackson2XmlHttpMessageConverter(ObjectMapper objectMapper) { + super(objectMapper, new MediaType[]{new MediaType("application", "xml", StandardCharsets.UTF_8), new MediaType("text", "xml", StandardCharsets.UTF_8), new MediaType("application", "*+xml", StandardCharsets.UTF_8)}); + Assert.isInstanceOf(XmlMapper.class, objectMapper, "XmlMapper required"); + } + + public void setObjectMapper(ObjectMapper objectMapper) { + Assert.isInstanceOf(XmlMapper.class, objectMapper, "XmlMapper required"); + super.setObjectMapper(objectMapper); + } + + protected List getMediaTypesForProblemDetail() { + return problemDetailMediaTypes; + } + + static { + problemDetailMediaTypes = Collections.singletonList(MediaType.APPLICATION_PROBLEM_XML); + } + + @Override + public boolean canRead(Type type, Class contextClass, MediaType mediaType) { + return false; + } + + @Override + public boolean canWrite(Class clazz, MediaType mediaType) { + return false; + } +} diff --git a/application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java b/application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java deleted file mode 100644 index 74c2dfda37..0000000000 --- a/application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.system; - -import lombok.extern.slf4j.Slf4j; -import org.junit.Test; -import org.junit.jupiter.api.Assertions; -import org.springframework.util.ClassUtils; -import org.springframework.web.client.RestTemplate; - - -@Slf4j -public class RestTemplateConvertersTest { - - @Test - public void testJacksonXmlConverter() { - ClassLoader classLoader = RestTemplate.class.getClassLoader(); - boolean jackson2XmlPresent = ClassUtils.isPresent("com.fasterxml.jackson.dataformat.xml.XmlMapper", classLoader); - Assertions.assertFalse(jackson2XmlPresent, "XmlMapper must not be present in classpath, please, exclude \"jackson-dataformat-xml\" dependency!"); - //If this xml mapper will be present in classpath then we will get "Unsupported Media Type" in RestTemplate - } - -} diff --git a/pom.xml b/pom.xml index 9b8943cff3..833028b1a8 100755 --- a/pom.xml +++ b/pom.xml @@ -2092,10 +2092,6 @@ io.jsonwebtoken jjwt-impl - - com.fasterxml.jackson.dataformat - jackson-dataformat-xml - From e5484dd94177197ac13ee20088573e04c7c611d2 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 1 Aug 2024 18:21:52 +0300 Subject: [PATCH 3/5] added test to verify usage of correct converter --- ...appingJackson2XmlHttpMessageConverter.java | 4 ++ .../system/RestTemplateConvertersTest.java | 61 +++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java diff --git a/application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java b/application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java index 0074526ebc..158f29a03a 100644 --- a/application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java +++ b/application/src/main/java/org/springframework/http/converter/xml/MappingJackson2XmlHttpMessageConverter.java @@ -27,6 +27,10 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; +/** + * RestTemplate firstly uses MappingJackson2XmlHttpMessageConverter converter instead of MappingJackson2HttpMessageConverter. + * It produces error UnsupportedMediaType, so this converter had to be shadowed for read and write operations to use the correct converter + */ public class MappingJackson2XmlHttpMessageConverter extends AbstractJackson2HttpMessageConverter { private static final List problemDetailMediaTypes; diff --git a/application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java b/application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java new file mode 100644 index 0000000000..5fde907bf1 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.system; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.springframework.http.MediaType; +import org.springframework.mock.http.client.MockClientHttpRequest; +import org.springframework.test.web.client.MockRestServiceServer; +import org.springframework.util.ClassUtils; +import org.springframework.web.client.RestTemplate; + +import java.nio.charset.StandardCharsets; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo; +import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess; + + +@Slf4j +public class RestTemplateConvertersTest { + + @Test + public void testMappingJackson2HttpMessageConverterIsUsedInsteadOfMappingJackson2XmlHttpMessageConverter() { + ClassLoader classLoader = RestTemplate.class.getClassLoader(); + boolean jackson2XmlPresent = ClassUtils.isPresent("com.fasterxml.jackson.dataformat.xml.XmlMapper", classLoader); + assertThat(jackson2XmlPresent).isTrue(); + + RestTemplate restTemplate = new RestTemplate(); + MockRestServiceServer mockServer = MockRestServiceServer.createServer(restTemplate); + mockServer.expect(requestTo("/test")) + .andExpect(request -> { + MockClientHttpRequest mockRequest = (MockClientHttpRequest) request; + byte[] body = mockRequest.getBodyAsBytes(); + String requestBody = new String(body, StandardCharsets.UTF_8); + assertThat(requestBody).contains("{\"name\":\"test\",\"value\":1}"); + }) + .andRespond(withSuccess("{\"name\":\"test\",\"value\":1}", MediaType.APPLICATION_JSON)); + + TestObject requestObject = new TestObject("test", 1); + TestObject actualObject = restTemplate.postForObject("/test", requestObject, TestObject.class); + assertThat(actualObject).isEqualTo(requestObject); + mockServer.verify(); + } + + record TestObject(String name, int value) {} + +} From 5759610340a724cb09e76b03c927f58c73b3c28d Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 5 Aug 2024 17:22:58 +0300 Subject: [PATCH 4/5] Rate limit for WS subscriptions --- ...efaultTbEntityDataSubscriptionService.java | 8 +++- .../DefaultTbLocalSubscriptionService.java | 41 ++++++++++++++--- .../subscription/TbAbstractDataSubCtx.java | 5 ++- .../subscription/TbAbstractSubCtx.java | 2 +- .../subscription/TbAlarmDataSubCtx.java | 5 ++- .../subscription/TbEntityDataSubCtx.java | 3 +- .../TbLocalSubscriptionService.java | 3 +- .../service/ws/DefaultWebSocketService.java | 21 ++++----- .../DefaultNotificationCommandsHandler.java | 4 +- .../src/main/resources/thingsboard.yml | 5 +++ .../server/common/data/limit/LimitedApi.java | 3 +- .../msg/tools/TbRateLimitsException.java | 6 +++ .../common/util/DeduplicationUtil.java | 44 +++++++++++++++++++ 13 files changed, 123 insertions(+), 27 deletions(-) create mode 100644 common/util/src/main/java/org/thingsboard/common/util/DeduplicationUtil.java diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 984af019b4..632ea44940 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -19,6 +19,8 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; @@ -42,6 +44,7 @@ import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.TsValue; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; @@ -66,8 +69,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -355,6 +356,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) { log.debug("[{}] Failed to process ws cmd: {}", sessionId, cmd, e); + if (e instanceof TbRateLimitsException) { + return; + } wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 8abb78d9c1..6e99731305 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -15,16 +15,20 @@ */ package org.thingsboard.server.service.subscription; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.DeduplicationUtil; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cache.limits.RateLimitService; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.AttributeScope; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.id.EntityId; @@ -34,10 +38,12 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; @@ -46,13 +52,12 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.ws.WebSocketService; +import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -88,13 +93,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer private final TbClusterService clusterService; private final SubscriptionManagerService subscriptionManagerService; private final WebSocketService webSocketService; + private final RateLimitService rateLimitService; private ExecutorService tsCallBackExecutor; private ScheduledExecutorService staleSessionCleanupExecutor; + @Value("${server.ws.rate_limits.subscriptions_per_tenant:2000:60}") + private String subscriptionsPerTenantRateLimit; + @Value("${server.ws.rate_limits.subscriptions_per_user:500:60}") + private String subscriptionsPerUserRateLimit; + public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService, TbClusterService clusterService, - @Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService) { + @Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService, + RateLimitService rateLimitService) { this.attrService = attrService; this.tsService = tsService; this.serviceInfoProvider = serviceInfoProvider; @@ -102,6 +114,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer this.clusterService = clusterService; this.subscriptionManagerService = subscriptionManagerService; this.webSocketService = webSocketService; + this.rateLimitService = rateLimitService; } private String serviceId; @@ -164,9 +177,18 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } @Override - public void addSubscription(TbSubscription subscription) { + public void addSubscription(TbSubscription subscription, WebSocketSessionRef sessionRef) { TenantId tenantId = subscription.getTenantId(); EntityId entityId = subscription.getEntityId(); + if (!rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, (Object) tenantId, subscriptionsPerTenantRateLimit)) { + handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per tenant"); + return; + } + if (sessionRef.getSecurityCtx() != null && !rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, sessionRef.getSecurityCtx().getId(), subscriptionsPerUserRateLimit)) { + handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per user"); + return; + } + log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); SubscriptionModificationResult result; subsLock.lock(); @@ -563,4 +585,13 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale); } + private void handleRateLimitError(TbSubscription subscription, WebSocketSessionRef sessionRef, String message) { + String deduplicationKey = sessionRef.getSessionId() + message; + if (!DeduplicationUtil.alreadyProcessed(deduplicationKey, TimeUnit.SECONDS.toMillis(15))) { + log.info("{} {}", sessionRef, message); + webSocketService.sendError(sessionRef, subscription.getSubscriptionId(), SubscriptionErrorCode.BAD_REQUEST, message); + } + throw new TbRateLimitsException(message); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index f65eec8453..77ef960d72 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -132,7 +132,7 @@ public abstract class TbAbstractDataSubCtx> keysByType = getEntityKeyByTypeMap(keys); for (EntityData entityData : data.getData()) { List entitySubscriptions = addSubscriptions(entityData, keysByType, latestValues, startTs, endTs); - entitySubscriptions.forEach(localSubscriptionService::addSubscription); + entitySubscriptions.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); } } @@ -254,4 +254,5 @@ public abstract class TbAbstractDataSubCtx { .scope(TbAttributeSubscriptionScope.SERVER_SCOPE) .build(); subToDynamicValueKeySet.add(subIdx); - localSubscriptionService.addSubscription(sub); + localSubscriptionService.addSubscription(sub, sessionRef); } } catch (InterruptedException | ExecutionException e) { log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet()); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index 0d1f2f94b2..828da388fb 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -177,7 +177,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { .updateProcessor((sub, update) -> sendWsMsg(sub.getSessionId(), update)) .ts(startTs) .build(); - localSubscriptionService.addSubscription(subscription); + localSubscriptionService.addSubscription(subscription, sessionRef); } @Override @@ -342,7 +342,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity)); } subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); - subsToAdd.forEach(localSubscriptionService::addSubscription); + subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); } private void resetInvocationCounter() { @@ -361,4 +361,5 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, entitiesSortOrder); return new EntityDataQuery(query.getEntityFilter(), edpl, query.getEntityFields(), query.getLatestValues(), query.getKeyFilters()); } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 98ec81b798..48458ca6fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -226,7 +226,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { } } subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); - subsToAdd.forEach(localSubscriptionService::addSubscription); + subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription)); } @@ -239,4 +239,5 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { protected EntityDataQuery buildEntityDataQuery() { return query; } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java index ddb2a1b590..59e7ad532d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; +import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; @@ -29,7 +30,7 @@ import java.util.List; public interface TbLocalSubscriptionService { - void addSubscription(TbSubscription subscription); + void addSubscription(TbSubscription subscription, WebSocketSessionRef sessionRef); void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index d7091bbad2..33b011cc0a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -21,6 +21,9 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.Nullable; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -46,6 +49,7 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -80,10 +84,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; -import jakarta.annotation.Nullable; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -224,9 +224,10 @@ public class DefaultWebSocketService implements WebSocketService { try { Optional.ofNullable(cmdsHandlers.get(cmd.getType())) .ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd)); + } catch (TbRateLimitsException e) { + log.debug("{} Failed to handle WS cmd: {}", sessionRef, cmd, e); } catch (Exception e) { - log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId, - sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e); + log.error("{} Failed to handle WS cmd: {}", sessionRef, cmd, e); } } } @@ -468,7 +469,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.lock(); try { - oldSubService.addSubscription(sub); + oldSubService.addSubscription(sub, sessionRef); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData)); } finally { subLock.unlock(); @@ -581,7 +582,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.lock(); try { - oldSubService.addSubscription(sub); + oldSubService.addSubscription(sub, sessionRef); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData)); } finally { subLock.unlock(); @@ -678,7 +679,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.lock(); try { - oldSubService.addSubscription(sub); + oldSubService.addSubscription(sub, sessionRef); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data)); } finally { subLock.unlock(); @@ -733,7 +734,7 @@ public class DefaultWebSocketService implements WebSocketService { subLock.lock(); try { - oldSubService.addSubscription(sub); + oldSubService.addSubscription(sub, sessionRef); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data)); } finally { subLock.unlock(); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java index 6fa9111c3d..5bfe976194 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java @@ -79,7 +79,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH .updateProcessor(this::handleNotificationsSubscriptionUpdate) .limit(cmd.getLimit()) .build(); - localSubscriptionService.addSubscription(subscription); + localSubscriptionService.addSubscription(subscription, sessionRef); fetchUnreadNotifications(subscription); sendUpdate(sessionRef.getSessionId(), subscription.createFullUpdate()); @@ -97,7 +97,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH .entityId(securityCtx.getId()) .updateProcessor(this::handleNotificationsCountSubscriptionUpdate) .build(); - localSubscriptionService.addSubscription(subscription); + localSubscriptionService.addSubscription(subscription, sessionRef); fetchUnreadNotificationsCount(subscription); sendUpdate(sessionRef.getSessionId(), subscription.createUpdate()); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 3db77d50c5..a9dc32963d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -78,6 +78,11 @@ server: max_queue_messages_per_session: "${TB_SERVER_WS_DEFAULT_QUEUE_MESSAGES_PER_SESSION:1000}" # Maximum time between WS session opening and sending auth command auth_timeout_ms: "${TB_SERVER_WS_AUTH_TIMEOUT_MS:10000}" + rate_limits: + # Per-tenant rate limit for WS subscriptions + subscriptions_per_tenant: "${TB_SERVER_WS_SUBSCRIPTIONS_PER_TENANT_RATE_LIMIT:2000:60}" + # Per-user rate limit for WS subscriptions + subscriptions_per_user: "${TB_SERVER_WS_SUBSCRIPTIONS_PER_USER_RATE_LIMIT:500:60}" rest: server_side_rpc: # Minimum value of the server-side RPC timeout. May override value provided in the REST API call. diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java index 7aa472bea1..6532a4fe0d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java @@ -42,7 +42,8 @@ public enum LimitedApi { TRANSPORT_MESSAGES_PER_DEVICE("transport messages per device", false), TRANSPORT_MESSAGES_PER_GATEWAY("transport messages per gateway", false), TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE("transport messages per gateway device", false), - EMAILS("emails sending", true); + EMAILS("emails sending", true), + WS_SUBSCRIPTIONS("WS subscriptions", false); private Function configExtractor; @Getter diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java index b39477fa38..32d55048ae 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java @@ -30,4 +30,10 @@ public class TbRateLimitsException extends AbstractRateLimitException { super(entityType.name() + " rate limits reached!"); this.entityType = entityType; } + + public TbRateLimitsException(String message) { + super(message); + this.entityType = null; + } + } diff --git a/common/util/src/main/java/org/thingsboard/common/util/DeduplicationUtil.java b/common/util/src/main/java/org/thingsboard/common/util/DeduplicationUtil.java new file mode 100644 index 0000000000..fd25e8b924 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/DeduplicationUtil.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import org.springframework.util.ConcurrentReferenceHashMap; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType.SOFT; + +public class DeduplicationUtil { + + private static final ConcurrentMap cache = new ConcurrentReferenceHashMap<>(16, SOFT); + + public static boolean alreadyProcessed(Object deduplicationKey, long deduplicationDuration) { + AtomicBoolean alreadyProcessed = new AtomicBoolean(false); + cache.compute(deduplicationKey, (key, lastProcessedTs) -> { + if (lastProcessedTs != null) { + long passed = System.currentTimeMillis() - lastProcessedTs; + if (passed <= deduplicationDuration) { + alreadyProcessed.set(true); + return lastProcessedTs; + } + } + return System.currentTimeMillis(); + }); + return alreadyProcessed.get(); + } + +} From 73214e9afe2269ca3d02f0d03c84a69f5027b478 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 6 Aug 2024 19:24:34 +0300 Subject: [PATCH 5/5] Fix KeyDictionaryDao.getOrSaveKeyId returning zero --- .../dao/sqlts/dictionary/JpaKeyDictionaryDao.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java index 870c0bc505..b01d1c4ea0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.dao.sqlts.dictionary; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.hibernate.exception.ConstraintViolationException; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.dictionary.KeyDictionaryDao; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; @@ -34,14 +36,15 @@ import java.util.concurrent.locks.ReentrantLock; @Component @Slf4j @SqlDao +@RequiredArgsConstructor public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao { + private final KeyDictionaryRepository keyDictionaryRepository; + private final ConcurrentMap keyDictionaryMap = new ConcurrentHashMap<>(); - protected static final ReentrantLock creationLock = new ReentrantLock(); - - @Autowired - private KeyDictionaryRepository keyDictionaryRepository; + private static final ReentrantLock creationLock = new ReentrantLock(); + @Transactional(propagation = Propagation.NOT_SUPPORTED) @Override public Integer getOrSaveKeyId(String strKey) { Integer keyId = keyDictionaryMap.get(strKey);