Rate limit for WS subscriptions

This commit is contained in:
ViacheslavKlimov 2024-08-05 17:22:58 +03:00
parent fc36fcaacf
commit 5759610340
13 changed files with 123 additions and 27 deletions

View File

@ -19,6 +19,8 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
@ -42,6 +44,7 @@ import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entity.EntityService;
@ -66,8 +69,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -355,6 +356,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) { private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) {
log.debug("[{}] Failed to process ws cmd: {}", sessionId, cmd, e); log.debug("[{}] Failed to process ws cmd: {}", sessionId, cmd, e);
if (e instanceof TbRateLimitsException) {
return;
}
wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED); wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED);
} }

View File

@ -15,16 +15,20 @@
*/ */
package org.thingsboard.server.service.subscription; package org.thingsboard.server.service.subscription;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.DeduplicationUtil;
import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cache.limits.RateLimitService;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
@ -34,10 +38,12 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
@ -46,13 +52,12 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -88,13 +93,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
private final TbClusterService clusterService; private final TbClusterService clusterService;
private final SubscriptionManagerService subscriptionManagerService; private final SubscriptionManagerService subscriptionManagerService;
private final WebSocketService webSocketService; private final WebSocketService webSocketService;
private final RateLimitService rateLimitService;
private ExecutorService tsCallBackExecutor; private ExecutorService tsCallBackExecutor;
private ScheduledExecutorService staleSessionCleanupExecutor; private ScheduledExecutorService staleSessionCleanupExecutor;
@Value("${server.ws.rate_limits.subscriptions_per_tenant:2000:60}")
private String subscriptionsPerTenantRateLimit;
@Value("${server.ws.rate_limits.subscriptions_per_user:500:60}")
private String subscriptionsPerUserRateLimit;
public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider,
PartitionService partitionService, TbClusterService clusterService, PartitionService partitionService, TbClusterService clusterService,
@Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService) { @Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService,
RateLimitService rateLimitService) {
this.attrService = attrService; this.attrService = attrService;
this.tsService = tsService; this.tsService = tsService;
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
@ -102,6 +114,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
this.clusterService = clusterService; this.clusterService = clusterService;
this.subscriptionManagerService = subscriptionManagerService; this.subscriptionManagerService = subscriptionManagerService;
this.webSocketService = webSocketService; this.webSocketService = webSocketService;
this.rateLimitService = rateLimitService;
} }
private String serviceId; private String serviceId;
@ -164,9 +177,18 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} }
@Override @Override
public void addSubscription(TbSubscription<?> subscription) { public void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef) {
TenantId tenantId = subscription.getTenantId(); TenantId tenantId = subscription.getTenantId();
EntityId entityId = subscription.getEntityId(); EntityId entityId = subscription.getEntityId();
if (!rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, (Object) tenantId, subscriptionsPerTenantRateLimit)) {
handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per tenant");
return;
}
if (sessionRef.getSecurityCtx() != null && !rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, sessionRef.getSecurityCtx().getId(), subscriptionsPerUserRateLimit)) {
handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per user");
return;
}
log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription);
SubscriptionModificationResult result; SubscriptionModificationResult result;
subsLock.lock(); subsLock.lock();
@ -563,4 +585,13 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale); subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale);
} }
private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) {
String deduplicationKey = sessionRef.getSessionId() + message;
if (!DeduplicationUtil.alreadyProcessed(deduplicationKey, TimeUnit.SECONDS.toMillis(15))) {
log.info("{} {}", sessionRef, message);
webSocketService.sendError(sessionRef, subscription.getSubscriptionId(), SubscriptionErrorCode.BAD_REQUEST, message);
}
throw new TbRateLimitsException(message);
}
} }

View File

@ -132,7 +132,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
subToEntityIdMap.put(subIdx, entityData.getEntityId()); subToEntityIdMap.put(subIdx, entityData.getEntityId());
localSubscriptionService.addSubscription( localSubscriptionService.addSubscription(
createTsSub(entityData, subIdx, false, startTs, endTs, keyStates, resultToLatestValues)); createTsSub(entityData, subIdx, false, startTs, endTs, keyStates, resultToLatestValues), sessionRef);
}); });
} }
@ -140,7 +140,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys); Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
for (EntityData entityData : data.getData()) { for (EntityData entityData : data.getData()) {
List<TbSubscription> entitySubscriptions = addSubscriptions(entityData, keysByType, latestValues, startTs, endTs); List<TbSubscription> entitySubscriptions = addSubscriptions(entityData, keysByType, latestValues, startTs, endTs);
entitySubscriptions.forEach(localSubscriptionService::addSubscription); entitySubscriptions.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
} }
} }
@ -254,4 +254,5 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
abstract void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues); abstract void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues);
protected abstract Aggregation getCurrentAggregation(); protected abstract Aggregation getCurrentAggregation();
} }

View File

@ -152,7 +152,7 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE) .scope(TbAttributeSubscriptionScope.SERVER_SCOPE)
.build(); .build();
subToDynamicValueKeySet.add(subIdx); subToDynamicValueKeySet.add(subIdx);
localSubscriptionService.addSubscription(sub); localSubscriptionService.addSubscription(sub, sessionRef);
} }
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet()); log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet());

View File

@ -177,7 +177,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
.updateProcessor((sub, update) -> sendWsMsg(sub.getSessionId(), update)) .updateProcessor((sub, update) -> sendWsMsg(sub.getSessionId(), update))
.ts(startTs) .ts(startTs)
.build(); .build();
localSubscriptionService.addSubscription(subscription); localSubscriptionService.addSubscription(subscription, sessionRef);
} }
@Override @Override
@ -342,7 +342,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity)); newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity));
} }
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
subsToAdd.forEach(localSubscriptionService::addSubscription); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
} }
private void resetInvocationCounter() { private void resetInvocationCounter() {
@ -361,4 +361,5 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, entitiesSortOrder); EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, entitiesSortOrder);
return new EntityDataQuery(query.getEntityFilter(), edpl, query.getEntityFields(), query.getLatestValues(), query.getKeyFilters()); return new EntityDataQuery(query.getEntityFilter(), edpl, query.getEntityFields(), query.getLatestValues(), query.getKeyFilters());
} }
} }

View File

@ -226,7 +226,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
} }
} }
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
subsToAdd.forEach(localSubscriptionService::addSubscription); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription)); sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
} }
@ -239,4 +239,5 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
protected EntityDataQuery buildEntityDataQuery() { protected EntityDataQuery buildEntityDataQuery() {
return query; return query;
} }
} }

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
@ -29,7 +30,7 @@ import java.util.List;
public interface TbLocalSubscriptionService { public interface TbLocalSubscriptionService {
void addSubscription(TbSubscription<?> subscription); void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef);
void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback); void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback);

View File

@ -21,6 +21,9 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -46,6 +49,7 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -80,10 +84,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -224,9 +224,10 @@ public class DefaultWebSocketService implements WebSocketService {
try { try {
Optional.ofNullable(cmdsHandlers.get(cmd.getType())) Optional.ofNullable(cmdsHandlers.get(cmd.getType()))
.ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd)); .ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd));
} catch (TbRateLimitsException e) {
log.debug("{} Failed to handle WS cmd: {}", sessionRef, cmd, e);
} catch (Exception e) { } catch (Exception e) {
log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId, log.error("{} Failed to handle WS cmd: {}", sessionRef, cmd, e);
sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e);
} }
} }
} }
@ -468,7 +469,7 @@ public class DefaultWebSocketService implements WebSocketService {
subLock.lock(); subLock.lock();
try { try {
oldSubService.addSubscription(sub); oldSubService.addSubscription(sub, sessionRef);
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData)); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
} finally { } finally {
subLock.unlock(); subLock.unlock();
@ -581,7 +582,7 @@ public class DefaultWebSocketService implements WebSocketService {
subLock.lock(); subLock.lock();
try { try {
oldSubService.addSubscription(sub); oldSubService.addSubscription(sub, sessionRef);
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData)); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
} finally { } finally {
subLock.unlock(); subLock.unlock();
@ -678,7 +679,7 @@ public class DefaultWebSocketService implements WebSocketService {
subLock.lock(); subLock.lock();
try { try {
oldSubService.addSubscription(sub); oldSubService.addSubscription(sub, sessionRef);
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data)); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
} finally { } finally {
subLock.unlock(); subLock.unlock();
@ -733,7 +734,7 @@ public class DefaultWebSocketService implements WebSocketService {
subLock.lock(); subLock.lock();
try { try {
oldSubService.addSubscription(sub); oldSubService.addSubscription(sub, sessionRef);
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data)); sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
} finally { } finally {
subLock.unlock(); subLock.unlock();

View File

@ -79,7 +79,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
.updateProcessor(this::handleNotificationsSubscriptionUpdate) .updateProcessor(this::handleNotificationsSubscriptionUpdate)
.limit(cmd.getLimit()) .limit(cmd.getLimit())
.build(); .build();
localSubscriptionService.addSubscription(subscription); localSubscriptionService.addSubscription(subscription, sessionRef);
fetchUnreadNotifications(subscription); fetchUnreadNotifications(subscription);
sendUpdate(sessionRef.getSessionId(), subscription.createFullUpdate()); sendUpdate(sessionRef.getSessionId(), subscription.createFullUpdate());
@ -97,7 +97,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
.entityId(securityCtx.getId()) .entityId(securityCtx.getId())
.updateProcessor(this::handleNotificationsCountSubscriptionUpdate) .updateProcessor(this::handleNotificationsCountSubscriptionUpdate)
.build(); .build();
localSubscriptionService.addSubscription(subscription); localSubscriptionService.addSubscription(subscription, sessionRef);
fetchUnreadNotificationsCount(subscription); fetchUnreadNotificationsCount(subscription);
sendUpdate(sessionRef.getSessionId(), subscription.createUpdate()); sendUpdate(sessionRef.getSessionId(), subscription.createUpdate());

View File

@ -78,6 +78,11 @@ server:
max_queue_messages_per_session: "${TB_SERVER_WS_DEFAULT_QUEUE_MESSAGES_PER_SESSION:1000}" max_queue_messages_per_session: "${TB_SERVER_WS_DEFAULT_QUEUE_MESSAGES_PER_SESSION:1000}"
# Maximum time between WS session opening and sending auth command # Maximum time between WS session opening and sending auth command
auth_timeout_ms: "${TB_SERVER_WS_AUTH_TIMEOUT_MS:10000}" auth_timeout_ms: "${TB_SERVER_WS_AUTH_TIMEOUT_MS:10000}"
rate_limits:
# Per-tenant rate limit for WS subscriptions
subscriptions_per_tenant: "${TB_SERVER_WS_SUBSCRIPTIONS_PER_TENANT_RATE_LIMIT:2000:60}"
# Per-user rate limit for WS subscriptions
subscriptions_per_user: "${TB_SERVER_WS_SUBSCRIPTIONS_PER_USER_RATE_LIMIT:500:60}"
rest: rest:
server_side_rpc: server_side_rpc:
# Minimum value of the server-side RPC timeout. May override value provided in the REST API call. # Minimum value of the server-side RPC timeout. May override value provided in the REST API call.

View File

@ -42,7 +42,8 @@ public enum LimitedApi {
TRANSPORT_MESSAGES_PER_DEVICE("transport messages per device", false), TRANSPORT_MESSAGES_PER_DEVICE("transport messages per device", false),
TRANSPORT_MESSAGES_PER_GATEWAY("transport messages per gateway", false), TRANSPORT_MESSAGES_PER_GATEWAY("transport messages per gateway", false),
TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE("transport messages per gateway device", false), TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE("transport messages per gateway device", false),
EMAILS("emails sending", true); EMAILS("emails sending", true),
WS_SUBSCRIPTIONS("WS subscriptions", false);
private Function<DefaultTenantProfileConfiguration, String> configExtractor; private Function<DefaultTenantProfileConfiguration, String> configExtractor;
@Getter @Getter

View File

@ -30,4 +30,10 @@ public class TbRateLimitsException extends AbstractRateLimitException {
super(entityType.name() + " rate limits reached!"); super(entityType.name() + " rate limits reached!");
this.entityType = entityType; this.entityType = entityType;
} }
public TbRateLimitsException(String message) {
super(message);
this.entityType = null;
}
} }

View File

@ -0,0 +1,44 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import org.springframework.util.ConcurrentReferenceHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType.SOFT;
public class DeduplicationUtil {
private static final ConcurrentMap<Object, Long> cache = new ConcurrentReferenceHashMap<>(16, SOFT);
public static boolean alreadyProcessed(Object deduplicationKey, long deduplicationDuration) {
AtomicBoolean alreadyProcessed = new AtomicBoolean(false);
cache.compute(deduplicationKey, (key, lastProcessedTs) -> {
if (lastProcessedTs != null) {
long passed = System.currentTimeMillis() - lastProcessedTs;
if (passed <= deduplicationDuration) {
alreadyProcessed.set(true);
return lastProcessedTs;
}
}
return System.currentTimeMillis();
});
return alreadyProcessed.get();
}
}