diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index b18cf7992c..37427c2f99 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -117,7 +117,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } callback.onSuccess(); if (event.hasTsOrAttrSub()) { - sendSubEventCallback(serviceId, entityId, event.getSeqNumber()); + sendSubEventCallback(tenantId, serviceId, entityId, event.getSeqNumber()); } } else { log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}" @@ -141,12 +141,12 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene } } - private void sendSubEventCallback(String targetId, EntityId entityId, int seqNumber) { + private void sendSubEventCallback(TenantId tenantId, String targetId, EntityId entityId, int seqNumber) { var update = getEntityUpdatesInfo(entityId); if (serviceId.equals(targetId)) { - localSubscriptionService.onSubEventCallback(entityId, seqNumber, update, TbCallback.EMPTY); + localSubscriptionService.onSubEventCallback(tenantId, entityId, seqNumber, update, TbCallback.EMPTY); } else { - sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(entityId.getId(), seqNumber, update)); + sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(tenantId, entityId.getId(), seqNumber, update)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 8fdb18cd1d..bbfdd2d44f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.common.util.DeduplicationUtil; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardExecutors; @@ -60,6 +61,7 @@ import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -121,12 +123,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer private String serviceId; private ExecutorService subscriptionUpdateExecutor; - private final Lock subsLock = new ReentrantLock(); + private final ConcurrentReferenceHashMap locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT); @PostConstruct public void initExecutor() { subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); + tsCallBackExecutor = Executors.newFixedThreadPool(8, ThingsBoardThreadFactory.forName("ts-sub-callback")); //since we are using locks by TenantId serviceId = serviceInfoProvider.getServiceId(); staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup")); staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS); @@ -157,28 +159,29 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer * Even if we cache locally the list of active subscriptions by entity id, it is still time-consuming operation to get them from cache * Since number of subscriptions is usually much less than number of devices that are pushing data. */ - Set staleSubs = new HashSet<>(); + Map> staleSubs = new HashMap<>(); subscriptionsByEntityId.forEach((id, sub) -> { try { pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED)); } catch (TenantNotFoundException e) { - staleSubs.add(id); + staleSubs.computeIfAbsent(sub.getTenantId(), key -> new HashSet<>()).add(id); log.warn("Cleaning up stale subscription {} for tenant {} due to TenantNotFoundException", id, sub.getTenantId()); } catch (Exception e) { log.error("Failed to push subscription {} to manager service", sub, e); } }); - if (!staleSubs.isEmpty()) { + staleSubs.forEach((tenantId, subs) -> { + var subsLock = getSubsLock(tenantId); subsLock.lock(); try { - staleSubs.forEach(entityId -> { + subs.forEach(entityId -> { subscriptionsByEntityId.remove(entityId); entityUpdates.remove(entityId); }); } finally { subsLock.unlock(); } - } + }); } } @@ -198,6 +201,10 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer }); } + Lock getSubsLock(TenantId tenantId) { + return locks.computeIfAbsent(tenantId, x -> new ReentrantLock()); + } + @Override public void addSubscription(TbSubscription subscription, WebSocketSessionRef sessionRef) { TenantId tenantId = subscription.getTenantId(); @@ -213,6 +220,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); SubscriptionModificationResult result; + final Lock subsLock = getSubsLock(tenantId); subsLock.lock(); try { Map> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); @@ -228,19 +236,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) { + TenantId tenantId; + if (subEventCallback.getTenantIdMSB() == 0 && subEventCallback.getTenantIdLSB() == 0) { + tenantId = TenantId.SYS_TENANT_ID; //TODO: remove after release + } else { + tenantId = TenantId.fromUUID(new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB())); + } UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB()); - onSubEventCallback(entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); + onSubEventCallback(tenantId, entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); } @Override - public void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { - onSubEventCallback(entityId.getId(), seqNumber, entityUpdatesInfo, callback); + public void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + onSubEventCallback(tenantId, entityId.getId(), seqNumber, entityUpdatesInfo, callback); } - public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { - log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo); + private void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + log.debug("[{}][{}][{}] Processing sub event callback: {}.", tenantId, entityId, seqNumber, entityUpdatesInfo); entityUpdates.put(entityId, entityUpdatesInfo); Set> pendingSubs = null; + Lock subsLock = getSubsLock(tenantId); subsLock.lock(); try { TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId); @@ -257,24 +272,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } @Override - public void cancelSubscription(String sessionId, int subscriptionId) { - log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); + public void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId) { + log.debug("[{}][{}][{}] Going to remove subscription.", tenantId, sessionId, subscriptionId); SubscriptionModificationResult result = null; + Lock subsLock = getSubsLock(tenantId); subsLock.lock(); try { Map> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); if (sessionSubscriptions != null) { TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); if (subscription != null) { + if (sessionSubscriptions.isEmpty()) { subscriptionsBySessionId.remove(sessionId); } result = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); } else { - log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); + log.debug("[{}][{}][{}] Subscription not found!", tenantId, sessionId, subscriptionId); } } else { - log.debug("[{}] No session subscriptions found!", sessionId); + log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId); } } finally { subsLock.unlock(); @@ -285,18 +302,19 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } @Override - public void cancelAllSessionSubscriptions(String sessionId) { - log.debug("[{}] Going to remove session subscriptions.", sessionId); + public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) { + log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId); List results = new ArrayList<>(); + Lock subsLock = getSubsLock(tenantId); subsLock.lock(); try { Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); if (sessionSubscriptions != null) { for (TbSubscription subscription : sessionSubscriptions.values()) { - results.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false)); + results.add(modifySubscription(tenantId, subscription.getEntityId(), subscription, false)); } } else { - log.debug("[{}] No session subscriptions found!", sessionId); + log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId); } } finally { subsLock.unlock(); @@ -604,7 +622,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } private void cleanupStaleSessions() { - subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale); + subscriptionsBySessionId.forEach((sessionId, subscriptions) -> + subscriptions.values() + .stream() + .findAny() + .ifPresent(subscription -> webSocketService.cleanupIfStale(subscription.getTenantId(), sessionId)) + ); } private void handleRateLimitError(TbSubscription subscription, WebSocketSessionRef sessionRef, String message) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index 77ef960d72..1ec47beeab 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -113,7 +113,7 @@ public abstract class TbAbstractDataSubCtx { protected void clearDynamicValueSubscriptions() { if (subToDynamicValueKeySet != null) { for (Integer subId : subToDynamicValueKeySet) { - localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); + localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subId); } subToDynamicValueKeySet.clear(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index 828da388fb..93c5f4323d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -341,7 +341,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { long startTs = System.currentTimeMillis() - query.getPageLink().getTimeWindow(); newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity)); } - subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); + subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId)); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 48458ca6fd..704e4f5c80 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -225,7 +225,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { } } } - subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); + subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId)); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription)); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java index ade8b8236f..f5a5639ec0 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java @@ -27,8 +27,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * Information about the local websocket subscriptions. @@ -42,8 +40,6 @@ public class TbEntityLocalSubsInfo { @Getter private final EntityId entityId; @Getter - private final Lock lock = new ReentrantLock(); - @Getter private final Set> subs = ConcurrentHashMap.newKeySet(); private volatile TbSubscriptionsInfo state = new TbSubscriptionsInfo(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java index 59e7ad532d..6d742e88a6 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java @@ -34,11 +34,11 @@ public interface TbLocalSubscriptionService { void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback); - void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty); + void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty); - void cancelSubscription(String sessionId, int subscriptionId); + void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId); - void cancelAllSessionSubscriptions(String sessionId); + void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId); void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 2d8440fa20..f5572a9a5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -82,10 +82,12 @@ public class TbSubscriptionUtils { return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build(); } - public static ToCoreNotificationMsg toProto(UUID id, int seqNumber, TbEntityUpdatesInfo update) { + public static ToCoreNotificationMsg toProto(TenantId tenantId, UUID id, int seqNumber, TbEntityUpdatesInfo update) { TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder() .setEntityIdMSB(id.getMostSignificantBits()) .setEntityIdLSB(id.getLeastSignificantBits()) + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setSeqNumber(seqNumber) .setAttributesUpdateTs(update.attributesUpdateTs) .setTimeSeriesUpdateTs(update.timeSeriesUpdateTs); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 33b011cc0a..8d96d14b49 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -193,17 +193,18 @@ public class DefaultWebSocketService implements WebSocketService { @Override public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent event) { String sessionId = sessionRef.getSessionId(); + TenantId tenantId = sessionRef.getSecurityCtx().getTenantId(); log.debug(PROCESSING_MSG, sessionId, event); switch (event.getEventType()) { case ESTABLISHED: wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef)); break; case ERROR: - log.debug("[{}] Unknown websocket session error: ", sessionId, + log.debug("[{}][{}] Unknown websocket session error: ", tenantId, sessionId, event.getError().orElse(new RuntimeException("No error specified"))); break; case CLOSED: - cleanupSessionById(sessionId); + cleanupSessionById(tenantId, sessionId); processSessionClose(sessionRef); break; } @@ -297,10 +298,10 @@ public class DefaultWebSocketService implements WebSocketService { } @Override - public void cleanupIfStale(String sessionId) { + public void cleanupIfStale(TenantId tenantId, String sessionId) { if (!msgEndpoint.isOpen(sessionId)) { log.info("[{}] Cleaning up stale session ", sessionId); - cleanupSessionById(sessionId); + cleanupSessionById(tenantId, sessionId); } } @@ -754,22 +755,23 @@ public class DefaultWebSocketService implements WebSocketService { } private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { + TenantId tenantId = sessionRef.getSecurityCtx().getTenantId(); if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { - log.warn("[{}][{}] Cleanup session due to empty entity id.", sessionId, cmd.getCmdId()); - cleanupSessionById(sessionId); + log.warn("[{}][{}][{}] Cleanup session due to empty entity id.", tenantId, sessionId, cmd.getCmdId()); + cleanupSessionById(tenantId, sessionId); } else { Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId()); if (subId == null) { - log.trace("[{}][{}] Failed to lookup subscription id mapping", sessionId, cmd.getCmdId()); + log.trace("[{}][{}][{}] Failed to lookup subscription id mapping", tenantId, sessionId, cmd.getCmdId()); subId = cmd.getCmdId(); } - oldSubService.cancelSubscription(sessionId, subId); + oldSubService.cancelSubscription(tenantId, sessionId, subId); } } - private void cleanupSessionById(String sessionId) { + private void cleanupSessionById(TenantId tenantId, String sessionId) { wsSessionsMap.remove(sessionId); - oldSubService.cancelAllSessionSubscriptions(sessionId); + oldSubService.cancelAllSessionSubscriptions(tenantId, sessionId); sessionCmdMap.remove(sessionId); entityDataSubService.cancelAllSessionSubscriptions(sessionId); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java index 284431ff84..00fd19780d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.ws; import org.springframework.web.socket.CloseStatus; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; @@ -37,6 +38,6 @@ public interface WebSocketService { void close(String sessionId, CloseStatus status); - void cleanupIfStale(String sessionId); + void cleanupIfStale(TenantId tenantId, String sessionId); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java index 44919b138c..7ce7bd5735 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketSessionRef.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.ws; import lombok.Builder; import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.service.security.model.SecurityUser; import java.net.InetSocketAddress; @@ -39,6 +40,10 @@ public class WebSocketSessionRef { private final WebSocketSessionType sessionType; private final AtomicInteger sessionSubIdSeq = new AtomicInteger(); + public TenantId getTenantId() { + return securityCtx != null ? securityCtx.getTenantId() : TenantId.SYS_TENANT_ID; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java index 285c58a0f5..1a9ff62924 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java @@ -249,7 +249,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH @Override public void handleUnsubCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) { - localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), cmd.getCmdId()); + localSubscriptionService.cancelSubscription(sessionRef.getTenantId(), sessionRef.getSessionId(), cmd.getCmdId()); } private void sendUpdate(String sessionId, CmdUpdate update) { diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 74f1d0560f..67d88afa89 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -834,6 +834,8 @@ message TbEntitySubEventCallbackProto { int32 seqNumber = 3; int64 attributesUpdateTs = 4; int64 timeSeriesUpdateTs = 5; + int64 tenantIdMSB = 6; + int64 tenantIdLSB = 7; } message TsValueProto {