From bff310b6913ab750fb506884bb29515098177bc0 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 8 Aug 2024 17:31:05 +0200 Subject: [PATCH 1/6] DefaultTbLocalSubscriptionService WS single lock refactored by tenantId; TbEntityLocalSubsInfo performance optimizations for remove subscription --- .../DefaultTbLocalSubscriptionService.java | 36 ++++++++++++++----- .../subscription/TbAlarmsSubscription.java | 8 ++--- .../subscription/TbAttributeSubscription.java | 8 ++--- .../subscription/TbEntityLocalSubsInfo.java | 32 ++++++++++++++--- .../service/subscription/TbSubscription.java | 24 +++---------- .../subscription/TbSubscriptionUtils.java | 5 +-- .../subscription/TbSubscriptionsInfo.java | 9 +++-- .../TbTimeSeriesSubscription.java | 8 ++--- .../sub/AbstractNotificationSubscription.java | 5 +++ .../sub/NotificationsCountSubscription.java | 6 +++- .../sub/NotificationsSubscription.java | 6 +++- 11 files changed, 87 insertions(+), 60 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 6e99731305..0d29d374a3 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.common.util.DeduplicationUtil; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardExecutors; @@ -120,12 +121,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer private String serviceId; private ExecutorService subscriptionUpdateExecutor; - private final Lock subsLock = new ReentrantLock(); + private final ConcurrentReferenceHashMap locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT); @PostConstruct public void initExecutor() { subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); + tsCallBackExecutor = Executors.newFixedThreadPool(8, ThingsBoardThreadFactory.forName("ts-sub-callback")); //since we are using locks by TenantId serviceId = serviceInfoProvider.getServiceId(); staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup")); staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS); @@ -176,6 +177,10 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer }); } + Lock getSubsLock(TenantId tenantId) { + return locks.computeIfAbsent(tenantId, x -> new ReentrantLock()); + } + @Override public void addSubscription(TbSubscription subscription, WebSocketSessionRef sessionRef) { TenantId tenantId = subscription.getTenantId(); @@ -191,6 +196,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); SubscriptionModificationResult result; + final Lock subsLock = getSubsLock(tenantId); subsLock.lock(); try { Map> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); @@ -219,14 +225,18 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo); entityUpdates.put(entityId, entityUpdatesInfo); Set> pendingSubs = null; - subsLock.lock(); + Lock subsLock = null; try { TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId); if (entitySubs != null) { + subsLock = getSubsLock(entitySubs.getTenantId()); + subsLock.lock(); pendingSubs = entitySubs.clearPendingSubscriptions(seqNumber); } } finally { - subsLock.unlock(); + if (subsLock != null) { + subsLock.unlock(); + } } if (pendingSubs != null) { pendingSubs.forEach(this::checkMissedUpdates); @@ -238,12 +248,14 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer public void cancelSubscription(String sessionId, int subscriptionId) { log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); SubscriptionModificationResult result = null; - subsLock.lock(); + Lock subsLock = null; try { Map> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); if (sessionSubscriptions != null) { TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); if (subscription != null) { + subsLock = getSubsLock(subscription.getTenantId()); + subsLock.lock(); if (sessionSubscriptions.isEmpty()) { subscriptionsBySessionId.remove(sessionId); } @@ -255,7 +267,9 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer log.debug("[{}] No session subscriptions found!", sessionId); } } finally { - subsLock.unlock(); + if (subsLock != null) { + subsLock.unlock(); + } } if (result != null && result.hasEvent()) { pushSubscriptionEvent(result); @@ -266,18 +280,24 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer public void cancelAllSessionSubscriptions(String sessionId) { log.debug("[{}] Going to remove session subscriptions.", sessionId); List results = new ArrayList<>(); - subsLock.lock(); + Lock subsLock = null; try { Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); if (sessionSubscriptions != null) { for (TbSubscription subscription : sessionSubscriptions.values()) { + if (subsLock == null) { + subsLock = getSubsLock(subscription.getTenantId()); + subsLock.lock(); + } results.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false)); } } else { log.debug("[{}] No session subscriptions found!", sessionId); } } finally { - subsLock.unlock(); + if (subsLock != null) { + subsLock.unlock(); + } } results.stream().filter(SubscriptionModificationResult::hasEvent).forEach(this::pushSubscriptionEvent); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java index a1d0aec29f..d59f8d74c3 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java @@ -36,12 +36,8 @@ public class TbAlarmsSubscription extends TbSubscription> subs = ConcurrentHashMap.newKeySet(); private volatile TbSubscriptionsInfo state = new TbSubscriptionsInfo(); @@ -88,6 +84,7 @@ public class TbEntityLocalSubsInfo { if (!newState.attrAllKeys) { if (attrSub.isAllKeys()) { newState.attrAllKeys = true; + newState.attrKeys = null; stateChanged = true; } else { if (newState.attrKeys == null) { @@ -104,6 +101,7 @@ public class TbEntityLocalSubsInfo { if (!newState.tsAllKeys) { if (tsSub.isAllKeys()) { newState.tsAllKeys = true; + newState.tsKeys = null; stateChanged = true; } else { if (newState.tsKeys == null) { @@ -137,8 +135,30 @@ public class TbEntityLocalSubsInfo { return toEvent(ComponentLifecycleEvent.DELETED); } TbSubscriptionsInfo oldState = state.copy(); - TbSubscriptionsInfo newState = new TbSubscriptionsInfo(); + + //copy unchanged state only + TbSubscriptionsInfo newState = state.copy(); + switch (sub.getType()) { + case NOTIFICATIONS: + case NOTIFICATIONS_COUNT: + newState.notifications = false; + break; + case ALARMS: + newState.alarms = false; + break; + case ATTRIBUTES: + newState.attrAllKeys = false; + newState.attrKeys = null; + break; + case TIMESERIES: + newState.tsAllKeys = false; + newState.tsKeys = null; + } + for (TbSubscription subscription : subs) { + if (subscription.getType() != sub.getType()) { + continue; // skip unchanged types + } switch (subscription.getType()) { case NOTIFICATIONS: case NOTIFICATIONS_COUNT: @@ -155,6 +175,7 @@ public class TbEntityLocalSubsInfo { var attrSub = (TbAttributeSubscription) subscription; if (!newState.attrAllKeys && attrSub.isAllKeys()) { newState.attrAllKeys = true; + newState.attrKeys = null; continue; } if (newState.attrKeys == null) { @@ -167,6 +188,7 @@ public class TbEntityLocalSubsInfo { var tsSub = (TbTimeSeriesSubscription) subscription; if (!newState.tsAllKeys && tsSub.isAllKeys()) { newState.tsAllKeys = true; + newState.tsKeys = null; continue; } if (newState.tsKeys == null) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java index d1822a6ffe..68c042ea51 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java @@ -15,41 +15,25 @@ */ package org.thingsboard.server.service.subscription; -import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @Data -@AllArgsConstructor +@EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY, doNotUseGetters = true) public abstract class TbSubscription { + @EqualsAndHashCode.Exclude private final String serviceId; private final String sessionId; private final int subscriptionId; private final TenantId tenantId; private final EntityId entityId; private final TbSubscriptionType type; + @EqualsAndHashCode.Exclude private final BiConsumer, T> updateProcessor; - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - TbSubscription that = (TbSubscription) o; - return subscriptionId == that.subscriptionId && - sessionId.equals(that.sessionId) && - tenantId.equals(that.tenantId) && - entityId.equals(that.entityId) && - type == that.type; - } - - @Override - public int hashCode() { - return Objects.hash(sessionId, subscriptionId, tenantId, entityId, type); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 2d8440fa20..d691b70c55 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -107,8 +107,9 @@ public class TbSubscriptionUtils { .type(event); if (!ComponentLifecycleEvent.DELETED.equals(event)) { builder.info(new TbSubscriptionsInfo(proto.getNotifications(), proto.getAlarms(), - proto.getTsAllKeys(), proto.getTsKeysCount() > 0 ? new HashSet<>(proto.getTsKeysList()) : null, - proto.getAttrAllKeys(), proto.getAttrKeysCount() > 0 ? new HashSet<>(proto.getAttrKeysList()) : null, + proto.getTsAllKeys(), proto.getAttrAllKeys(), + proto.getTsKeysCount() > 0 ? new HashSet<>(proto.getTsKeysList()) : null, + proto.getAttrKeysCount() > 0 ? new HashSet<>(proto.getAttrKeysList()) : null, proto.getSeqNumber())); } return builder.build(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java index aa8ea70eab..7430d2e386 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java @@ -20,25 +20,24 @@ import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; import lombok.ToString; -import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; /** * Information about the local websocket subscriptions. */ @RequiredArgsConstructor @AllArgsConstructor -@EqualsAndHashCode(exclude = {"seqNumber"}) +@EqualsAndHashCode @ToString public class TbSubscriptionsInfo { protected boolean notifications; protected boolean alarms; protected boolean tsAllKeys; + protected boolean attrAllKeys; // primitives first for equals performance protected Set tsKeys; - protected boolean attrAllKeys; protected Set attrKeys; + @EqualsAndHashCode.Exclude protected int seqNumber; public boolean isEmpty() { @@ -50,7 +49,7 @@ public class TbSubscriptionsInfo { } protected TbSubscriptionsInfo copy(int seqNumber) { - return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys, attrAllKeys, attrKeys, seqNumber); + return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, attrAllKeys, tsKeys, attrKeys, seqNumber); } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java index 10a24106ad..0fc9a80122 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java @@ -53,12 +53,8 @@ public class TbTimeSeriesSubscription extends TbSubscription extends TbSubscription super(serviceId, sessionId, subscriptionId, tenantId, entityId, type, updateProcessor); } + @Override + protected boolean canEqual(final Object other) { + return other instanceof AbstractNotificationSubscription; + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java index 1e812c3b31..491acedb1e 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java @@ -23,7 +23,6 @@ import org.thingsboard.server.service.subscription.TbSubscription; import org.thingsboard.server.service.subscription.TbSubscriptionType; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @Getter @@ -35,6 +34,11 @@ public class NotificationsCountSubscription extends AbstractNotificationSubscrip super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.NOTIFICATIONS_COUNT, updateProcessor); } + @Override + protected boolean canEqual(final Object other) { + return other instanceof NotificationsCountSubscription; + } + public UnreadNotificationsCountUpdate createUpdate() { return UnreadNotificationsCountUpdate.builder() .cmdId(getSubscriptionId()) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java index 97dc15f6b0..867f48b72e 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java @@ -30,7 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -48,6 +47,11 @@ public class NotificationsSubscription extends AbstractNotificationSubscription< this.limit = limit; } + @Override + protected boolean canEqual(final Object other) { + return other instanceof NotificationsSubscription; + } + public UnreadNotificationsUpdate createFullUpdate() { return UnreadNotificationsUpdate.builder() .cmdId(getSubscriptionId()) From bc0022aafda9c8d3ffc7a7efad14f27680a4aeb3 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 27 Aug 2024 15:05:39 +0200 Subject: [PATCH 2/6] Added tenant id to the methods with locks and minor refactoring --- .../DefaultSubscriptionManagerService.java | 8 +-- .../DefaultTbLocalSubscriptionService.java | 64 +++++++++---------- .../subscription/TbAbstractDataSubCtx.java | 2 +- .../subscription/TbAbstractSubCtx.java | 2 +- .../subscription/TbAlarmDataSubCtx.java | 2 +- .../subscription/TbAlarmsSubscription.java | 2 +- .../subscription/TbEntityDataSubCtx.java | 2 +- .../TbLocalSubscriptionService.java | 6 +- .../subscription/TbSubscriptionUtils.java | 6 +- .../service/ws/DefaultWebSocketService.java | 22 ++++--- .../server/service/ws/WebSocketService.java | 3 +- .../DefaultNotificationCommandsHandler.java | 2 +- common/proto/src/main/proto/queue.proto | 2 + 13 files changed, 63 insertions(+), 60 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index 66b73781a0..5e1cd06b62 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -118,7 +118,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } callback.onSuccess(); if (event.hasTsOrAttrSub()) { - sendSubEventCallback(serviceId, entityId, event.getSeqNumber()); + sendSubEventCallback(tenantId, serviceId, entityId, event.getSeqNumber()); } } else { log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}" @@ -142,12 +142,12 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } } - private void sendSubEventCallback(String targetId, EntityId entityId, int seqNumber) { + private void sendSubEventCallback(TenantId tenantId, String targetId, EntityId entityId, int seqNumber) { var update = getEntityUpdatesInfo(entityId); if (serviceId.equals(targetId)) { - localSubscriptionService.onSubEventCallback(entityId, seqNumber, update, TbCallback.EMPTY); + localSubscriptionService.onSubEventCallback(tenantId, entityId, seqNumber, update, TbCallback.EMPTY); } else { - sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(entityId.getId(), seqNumber, update)); + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(tenantId, entityId.getId(), seqNumber, update)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 0d29d374a3..ceafea6553 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -213,30 +213,28 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) { UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB()); - onSubEventCallback(entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); + TenantId tenantId = TenantId.fromUUID(new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB())); + onSubEventCallback(tenantId, entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); } @Override - public void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { - onSubEventCallback(entityId.getId(), seqNumber, entityUpdatesInfo, callback); + public void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + onSubEventCallback(tenantId, entityId.getId(), seqNumber, entityUpdatesInfo, callback); } - public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { - log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo); + public void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + log.debug("[{}][{}][{}] Processing sub event callback: {}.", tenantId, entityId, seqNumber, entityUpdatesInfo); entityUpdates.put(entityId, entityUpdatesInfo); Set> pendingSubs = null; - Lock subsLock = null; + Lock subsLock = getSubsLock(tenantId); + subsLock.lock(); try { TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId); if (entitySubs != null) { - subsLock = getSubsLock(entitySubs.getTenantId()); - subsLock.lock(); pendingSubs = entitySubs.clearPendingSubscriptions(seqNumber); } } finally { - if (subsLock != null) { - subsLock.unlock(); - } + subsLock.unlock(); } if (pendingSubs != null) { pendingSubs.forEach(this::checkMissedUpdates); @@ -245,31 +243,29 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } @Override - public void cancelSubscription(String sessionId, int subscriptionId) { - log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); + public void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId) { + log.debug("[{}][{}][{}] Going to remove subscription.", tenantId, sessionId, subscriptionId); SubscriptionModificationResult result = null; - Lock subsLock = null; + Lock subsLock = getSubsLock(tenantId); + subsLock.lock(); try { Map> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); if (sessionSubscriptions != null) { TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); if (subscription != null) { - subsLock = getSubsLock(subscription.getTenantId()); - subsLock.lock(); + if (sessionSubscriptions.isEmpty()) { subscriptionsBySessionId.remove(sessionId); } result = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); } else { - log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); + log.debug("[{}][{}][{}] Subscription not found!", tenantId, sessionId, subscriptionId); } } else { - log.debug("[{}] No session subscriptions found!", sessionId); + log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId); } } finally { - if (subsLock != null) { - subsLock.unlock(); - } + subsLock.unlock(); } if (result != null && result.hasEvent()) { pushSubscriptionEvent(result); @@ -277,27 +273,22 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } @Override - public void cancelAllSessionSubscriptions(String sessionId) { - log.debug("[{}] Going to remove session subscriptions.", sessionId); + public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) { + log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId); List results = new ArrayList<>(); - Lock subsLock = null; + Lock subsLock = getSubsLock(tenantId); + subsLock.lock(); try { Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); if (sessionSubscriptions != null) { for (TbSubscription subscription : sessionSubscriptions.values()) { - if (subsLock == null) { - subsLock = getSubsLock(subscription.getTenantId()); - subsLock.lock(); - } - results.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false)); + results.add(modifySubscription(tenantId, subscription.getEntityId(), subscription, false)); } } else { - log.debug("[{}] No session subscriptions found!", sessionId); + log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId); } } finally { - if (subsLock != null) { - subsLock.unlock(); - } + subsLock.unlock(); } results.stream().filter(SubscriptionModificationResult::hasEvent).forEach(this::pushSubscriptionEvent); } @@ -602,7 +593,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } private void cleanupStaleSessions() { - subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale); + subscriptionsBySessionId.forEach((sessionId, subscriptions) -> + subscriptions.values() + .stream() + .findAny() + .ifPresent(subscription -> webSocketService.cleanupIfStale(subscription.getTenantId(), sessionId)) + ); } private void handleRateLimitError(TbSubscription subscription, WebSocketSessionRef sessionRef, String message) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index 77ef960d72..1ec47beeab 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -113,7 +113,7 @@ public abstract class TbAbstractDataSubCtx { protected void clearDynamicValueSubscriptions() { if (subToDynamicValueKeySet != null) { for (Integer subId : subToDynamicValueKeySet) { - localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); + localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subId); } subToDynamicValueKeySet.clear(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index 828da388fb..93c5f4323d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -341,7 +341,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { long startTs = System.currentTimeMillis() - query.getPageLink().getTimeWindow(); newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity)); } - subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); + subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId)); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java index d59f8d74c3..a9ab7797b7 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java @@ -36,7 +36,7 @@ public class TbAlarmsSubscription extends TbSubscription { } } } - subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); + subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId)); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription)); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java index 59e7ad532d..6d742e88a6 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java @@ -34,11 +34,11 @@ public interface TbLocalSubscriptionService { void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback); - void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty); + void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty); - void cancelSubscription(String sessionId, int subscriptionId); + void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId); - void cancelAllSessionSubscriptions(String sessionId); + void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId); void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index d691b70c55..79e41ecbfe 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -82,10 +82,12 @@ public class TbSubscriptionUtils { return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build(); } - public static ToCoreNotificationMsg toProto(UUID id, int seqNumber, TbEntityUpdatesInfo update) { + public static ToCoreNotificationMsg toProto(TenantId tenantId, UUID id, int seqNumber, TbEntityUpdatesInfo update) { TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder() .setEntityIdMSB(id.getMostSignificantBits()) - .setEntityIdLSB(id.getLeastSignificantBits()) + .setEntityIdLSB(tenantId.getId().getLeastSignificantBits()) + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(id.getLeastSignificantBits()) .setSeqNumber(seqNumber) .setAttributesUpdateTs(update.attributesUpdateTs) .setTimeSeriesUpdateTs(update.timeSeriesUpdateTs); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 33b011cc0a..8d96d14b49 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -193,17 +193,18 @@ public class DefaultWebSocketService implements WebSocketService { @Override public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent event) { String sessionId = sessionRef.getSessionId(); + TenantId tenantId = sessionRef.getSecurityCtx().getTenantId(); log.debug(PROCESSING_MSG, sessionId, event); switch (event.getEventType()) { case ESTABLISHED: wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef)); break; case ERROR: - log.debug("[{}] Unknown websocket session error: ", sessionId, + log.debug("[{}][{}] Unknown websocket session error: ", tenantId, sessionId, event.getError().orElse(new RuntimeException("No error specified"))); break; case CLOSED: - cleanupSessionById(sessionId); + cleanupSessionById(tenantId, sessionId); processSessionClose(sessionRef); break; } @@ -297,10 +298,10 @@ public class DefaultWebSocketService implements WebSocketService { } @Override - public void cleanupIfStale(String sessionId) { + public void cleanupIfStale(TenantId tenantId, String sessionId) { if (!msgEndpoint.isOpen(sessionId)) { log.info("[{}] Cleaning up stale session ", sessionId); - cleanupSessionById(sessionId); + cleanupSessionById(tenantId, sessionId); } } @@ -754,22 +755,23 @@ public class DefaultWebSocketService implements WebSocketService { } private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { + TenantId tenantId = sessionRef.getSecurityCtx().getTenantId(); if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { - log.warn("[{}][{}] Cleanup session due to empty entity id.", sessionId, cmd.getCmdId()); - cleanupSessionById(sessionId); + log.warn("[{}][{}][{}] Cleanup session due to empty entity id.", tenantId, sessionId, cmd.getCmdId()); + cleanupSessionById(tenantId, sessionId); } else { Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId()); if (subId == null) { - log.trace("[{}][{}] Failed to lookup subscription id mapping", sessionId, cmd.getCmdId()); + log.trace("[{}][{}][{}] Failed to lookup subscription id mapping", tenantId, sessionId, cmd.getCmdId()); subId = cmd.getCmdId(); } - oldSubService.cancelSubscription(sessionId, subId); + oldSubService.cancelSubscription(tenantId, sessionId, subId); } } - private void cleanupSessionById(String sessionId) { + private void cleanupSessionById(TenantId tenantId, String sessionId) { wsSessionsMap.remove(sessionId); - oldSubService.cancelAllSessionSubscriptions(sessionId); + oldSubService.cancelAllSessionSubscriptions(tenantId, sessionId); sessionCmdMap.remove(sessionId); entityDataSubService.cancelAllSessionSubscriptions(sessionId); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java index 284431ff84..00fd19780d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.ws; import org.springframework.web.socket.CloseStatus; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; @@ -37,6 +38,6 @@ public interface WebSocketService { void close(String sessionId, CloseStatus status); - void cleanupIfStale(String sessionId); + void cleanupIfStale(TenantId tenantId, String sessionId); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java index 5bfe976194..84f3698845 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java @@ -242,7 +242,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH @Override public void handleUnsubCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) { - localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), cmd.getCmdId()); + localSubscriptionService.cancelSubscription(sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSessionId(), cmd.getCmdId()); } private void sendUpdate(String sessionId, CmdUpdate update) { diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 581b993120..2be1254a1a 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -817,6 +817,8 @@ message TbEntitySubEventCallbackProto { int32 seqNumber = 3; int64 attributesUpdateTs = 4; int64 timeSeriesUpdateTs = 5; + int64 tenantIdMSB = 6; + int64 tenantIdLSB = 7; } message TsValueProto { From b26bf1410ce9e863d91c3a6509231d2024c2eaa8 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 28 Aug 2024 10:24:22 +0200 Subject: [PATCH 3/6] fixed TbEntitySubEventCallbackProto build --- .../server/service/subscription/TbSubscriptionUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 79e41ecbfe..3d0037caba 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -85,9 +85,9 @@ public class TbSubscriptionUtils { public static ToCoreNotificationMsg toProto(TenantId tenantId, UUID id, int seqNumber, TbEntityUpdatesInfo update) { TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder() .setEntityIdMSB(id.getMostSignificantBits()) - .setEntityIdLSB(tenantId.getId().getLeastSignificantBits()) + .setEntityIdLSB(id.getLeastSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(id.getLeastSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setSeqNumber(seqNumber) .setAttributesUpdateTs(update.attributesUpdateTs) .setTimeSeriesUpdateTs(update.timeSeriesUpdateTs); From 779acf1891788d4a4bcd5c0e30118c1262ab6c03 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 28 Aug 2024 23:00:08 +0200 Subject: [PATCH 4/6] used sys tenant id for old protos --- .../subscription/DefaultTbLocalSubscriptionService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index ceafea6553..bd2ae08474 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -212,8 +212,13 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) { + TenantId tenantId; + if (subEventCallback.getTenantIdMSB() == 0 && subEventCallback.getTenantIdLSB() == 0) { + tenantId = TenantId.SYS_TENANT_ID; //TODO: remove after release + } else { + tenantId = TenantId.fromUUID(new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB())); + } UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB()); - TenantId tenantId = TenantId.fromUUID(new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB())); onSubEventCallback(tenantId, entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); } From 399c07259503c2fe5616b0c4d9a75e0cbe639941 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 28 Aug 2024 23:26:00 +0200 Subject: [PATCH 5/6] revert changes that are not related to this PR --- .../DefaultTbLocalSubscriptionService.java | 2 +- .../subscription/TbAlarmsSubscription.java | 8 ++++-- .../subscription/TbAttributeSubscription.java | 8 ++++-- .../subscription/TbEntityLocalSubsInfo.java | 28 +------------------ .../service/subscription/TbSubscription.java | 23 ++++++++++++--- .../subscription/TbSubscriptionUtils.java | 5 ++-- .../subscription/TbSubscriptionsInfo.java | 7 ++--- .../TbTimeSeriesSubscription.java | 8 ++++-- .../sub/AbstractNotificationSubscription.java | 5 ---- .../sub/NotificationsCountSubscription.java | 5 ---- .../sub/NotificationsSubscription.java | 5 ---- 11 files changed, 44 insertions(+), 60 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index bd2ae08474..9e08e0359b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -227,7 +227,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer onSubEventCallback(tenantId, entityId.getId(), seqNumber, entityUpdatesInfo, callback); } - public void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + private void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { log.debug("[{}][{}][{}] Processing sub event callback: {}.", tenantId, entityId, seqNumber, entityUpdatesInfo); entityUpdates.put(entityId, entityUpdatesInfo); Set> pendingSubs = null; diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java index a9ab7797b7..a1d0aec29f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java @@ -36,8 +36,12 @@ public class TbAlarmsSubscription extends TbSubscription subscription : subs) { - if (subscription.getType() != sub.getType()) { - continue; // skip unchanged types - } switch (subscription.getType()) { case NOTIFICATIONS: case NOTIFICATIONS_COUNT: @@ -175,7 +151,6 @@ public class TbEntityLocalSubsInfo { var attrSub = (TbAttributeSubscription) subscription; if (!newState.attrAllKeys && attrSub.isAllKeys()) { newState.attrAllKeys = true; - newState.attrKeys = null; continue; } if (newState.attrKeys == null) { @@ -188,7 +163,6 @@ public class TbEntityLocalSubsInfo { var tsSub = (TbTimeSeriesSubscription) subscription; if (!newState.tsAllKeys && tsSub.isAllKeys()) { newState.tsAllKeys = true; - newState.tsKeys = null; continue; } if (newState.tsKeys == null) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java index 68c042ea51..6d210befc0 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscription.java @@ -15,25 +15,40 @@ */ package org.thingsboard.server.service.subscription; +import lombok.AllArgsConstructor; import lombok.Data; -import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import java.util.Objects; import java.util.function.BiConsumer; @Data -@EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY, doNotUseGetters = true) +@AllArgsConstructor public abstract class TbSubscription { - @EqualsAndHashCode.Exclude private final String serviceId; private final String sessionId; private final int subscriptionId; private final TenantId tenantId; private final EntityId entityId; private final TbSubscriptionType type; - @EqualsAndHashCode.Exclude private final BiConsumer, T> updateProcessor; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TbSubscription that = (TbSubscription) o; + return subscriptionId == that.subscriptionId && + sessionId.equals(that.sessionId) && + tenantId.equals(that.tenantId) && + entityId.equals(that.entityId) && + type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(sessionId, subscriptionId, tenantId, entityId, type); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 3d0037caba..f5572a9a5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -109,9 +109,8 @@ public class TbSubscriptionUtils { .type(event); if (!ComponentLifecycleEvent.DELETED.equals(event)) { builder.info(new TbSubscriptionsInfo(proto.getNotifications(), proto.getAlarms(), - proto.getTsAllKeys(), proto.getAttrAllKeys(), - proto.getTsKeysCount() > 0 ? new HashSet<>(proto.getTsKeysList()) : null, - proto.getAttrKeysCount() > 0 ? new HashSet<>(proto.getAttrKeysList()) : null, + proto.getTsAllKeys(), proto.getTsKeysCount() > 0 ? new HashSet<>(proto.getTsKeysList()) : null, + proto.getAttrAllKeys(), proto.getAttrKeysCount() > 0 ? new HashSet<>(proto.getAttrKeysList()) : null, proto.getSeqNumber())); } return builder.build(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java index 7430d2e386..48464d5297 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionsInfo.java @@ -27,17 +27,16 @@ import java.util.Set; */ @RequiredArgsConstructor @AllArgsConstructor -@EqualsAndHashCode +@EqualsAndHashCode(exclude = {"seqNumber"}) @ToString public class TbSubscriptionsInfo { protected boolean notifications; protected boolean alarms; protected boolean tsAllKeys; - protected boolean attrAllKeys; // primitives first for equals performance protected Set tsKeys; + protected boolean attrAllKeys; protected Set attrKeys; - @EqualsAndHashCode.Exclude protected int seqNumber; public boolean isEmpty() { @@ -49,7 +48,7 @@ public class TbSubscriptionsInfo { } protected TbSubscriptionsInfo copy(int seqNumber) { - return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, attrAllKeys, tsKeys, attrKeys, seqNumber); + return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys, attrAllKeys, attrKeys, seqNumber); } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java index 0fc9a80122..10a24106ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbTimeSeriesSubscription.java @@ -53,8 +53,12 @@ public class TbTimeSeriesSubscription extends TbSubscription extends TbSubscription super(serviceId, sessionId, subscriptionId, tenantId, entityId, type, updateProcessor); } - @Override - protected boolean canEqual(final Object other) { - return other instanceof AbstractNotificationSubscription; - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java index 491acedb1e..171416a63d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsCountSubscription.java @@ -34,11 +34,6 @@ public class NotificationsCountSubscription extends AbstractNotificationSubscrip super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.NOTIFICATIONS_COUNT, updateProcessor); } - @Override - protected boolean canEqual(final Object other) { - return other instanceof NotificationsCountSubscription; - } - public UnreadNotificationsCountUpdate createUpdate() { return UnreadNotificationsCountUpdate.builder() .cmdId(getSubscriptionId()) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java index 867f48b72e..8306e7f95c 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/sub/NotificationsSubscription.java @@ -47,11 +47,6 @@ public class NotificationsSubscription extends AbstractNotificationSubscription< this.limit = limit; } - @Override - protected boolean canEqual(final Object other) { - return other instanceof NotificationsSubscription; - } - public UnreadNotificationsUpdate createFullUpdate() { return UnreadNotificationsUpdate.builder() .cmdId(getSubscriptionId()) From 0af0c3667378be8ba180786ae121e5b11d8ff084 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 2 Sep 2024 10:41:17 +0200 Subject: [PATCH 6/6] refactoring --- .../thingsboard/server/service/ws/WebSocketSessionRef.java | 5 +++++ .../ws/notification/DefaultNotificationCommandsHandler.java | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java index 44919b138c..7ce7bd5735 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.ws; import lombok.Builder; import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.service.security.model.SecurityUser; import java.net.InetSocketAddress; @@ -39,6 +40,10 @@ public class WebSocketSessionRef { private final WebSocketSessionType sessionType; private final AtomicInteger sessionSubIdSeq = new AtomicInteger(); + public TenantId getTenantId() { + return securityCtx != null ? securityCtx.getTenantId() : TenantId.SYS_TENANT_ID; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java index 84f3698845..77f000990c 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java @@ -242,7 +242,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH @Override public void handleUnsubCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) { - localSubscriptionService.cancelSubscription(sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSessionId(), cmd.getCmdId()); + localSubscriptionService.cancelSubscription(sessionRef.getTenantId(), sessionRef.getSessionId(), cmd.getCmdId()); } private void sendUpdate(String sessionId, CmdUpdate update) {