Merge pull request #11564 from YevhenBondarenko/merge-with-hotfix-3.7

merge with hotfix/3.7
This commit is contained in:
Viacheslav Klimov 2024-09-03 15:00:48 +03:00 committed by GitHub
commit 2b163e86c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 80 additions and 49 deletions

View File

@ -117,7 +117,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
} }
callback.onSuccess(); callback.onSuccess();
if (event.hasTsOrAttrSub()) { if (event.hasTsOrAttrSub()) {
sendSubEventCallback(serviceId, entityId, event.getSeqNumber()); sendSubEventCallback(tenantId, serviceId, entityId, event.getSeqNumber());
} }
} else { } else {
log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}" log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}"
@ -141,12 +141,12 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
} }
} }
private void sendSubEventCallback(String targetId, EntityId entityId, int seqNumber) { private void sendSubEventCallback(TenantId tenantId, String targetId, EntityId entityId, int seqNumber) {
var update = getEntityUpdatesInfo(entityId); var update = getEntityUpdatesInfo(entityId);
if (serviceId.equals(targetId)) { if (serviceId.equals(targetId)) {
localSubscriptionService.onSubEventCallback(entityId, seqNumber, update, TbCallback.EMPTY); localSubscriptionService.onSubEventCallback(tenantId, entityId, seqNumber, update, TbCallback.EMPTY);
} else { } else {
sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(entityId.getId(), seqNumber, update)); sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(tenantId, entityId.getId(), seqNumber, update));
} }
} }

View File

@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.DeduplicationUtil; import org.thingsboard.common.util.DeduplicationUtil;
import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
@ -60,6 +61,7 @@ import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -121,12 +123,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
private String serviceId; private String serviceId;
private ExecutorService subscriptionUpdateExecutor; private ExecutorService subscriptionUpdateExecutor;
private final Lock subsLock = new ReentrantLock(); private final ConcurrentReferenceHashMap<TenantId, Lock> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT);
@PostConstruct @PostConstruct
public void initExecutor() { public void initExecutor() {
subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); tsCallBackExecutor = Executors.newFixedThreadPool(8, ThingsBoardThreadFactory.forName("ts-sub-callback")); //since we are using locks by TenantId
serviceId = serviceInfoProvider.getServiceId(); serviceId = serviceInfoProvider.getServiceId();
staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup")); staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup"));
staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS); staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS);
@ -157,28 +159,29 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
* Even if we cache locally the list of active subscriptions by entity id, it is still time-consuming operation to get them from cache * Even if we cache locally the list of active subscriptions by entity id, it is still time-consuming operation to get them from cache
* Since number of subscriptions is usually much less than number of devices that are pushing data. * Since number of subscriptions is usually much less than number of devices that are pushing data.
*/ */
Set<UUID> staleSubs = new HashSet<>(); Map<TenantId, Set<UUID>> staleSubs = new HashMap<>();
subscriptionsByEntityId.forEach((id, sub) -> { subscriptionsByEntityId.forEach((id, sub) -> {
try { try {
pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED)); pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED));
} catch (TenantNotFoundException e) { } catch (TenantNotFoundException e) {
staleSubs.add(id); staleSubs.computeIfAbsent(sub.getTenantId(), key -> new HashSet<>()).add(id);
log.warn("Cleaning up stale subscription {} for tenant {} due to TenantNotFoundException", id, sub.getTenantId()); log.warn("Cleaning up stale subscription {} for tenant {} due to TenantNotFoundException", id, sub.getTenantId());
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to push subscription {} to manager service", sub, e); log.error("Failed to push subscription {} to manager service", sub, e);
} }
}); });
if (!staleSubs.isEmpty()) { staleSubs.forEach((tenantId, subs) -> {
var subsLock = getSubsLock(tenantId);
subsLock.lock(); subsLock.lock();
try { try {
staleSubs.forEach(entityId -> { subs.forEach(entityId -> {
subscriptionsByEntityId.remove(entityId); subscriptionsByEntityId.remove(entityId);
entityUpdates.remove(entityId); entityUpdates.remove(entityId);
}); });
} finally { } finally {
subsLock.unlock(); subsLock.unlock();
} }
} });
} }
} }
@ -198,6 +201,10 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
}); });
} }
Lock getSubsLock(TenantId tenantId) {
return locks.computeIfAbsent(tenantId, x -> new ReentrantLock());
}
@Override @Override
public void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef) { public void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef) {
TenantId tenantId = subscription.getTenantId(); TenantId tenantId = subscription.getTenantId();
@ -213,6 +220,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription);
SubscriptionModificationResult result; SubscriptionModificationResult result;
final Lock subsLock = getSubsLock(tenantId);
subsLock.lock(); subsLock.lock();
try { try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>());
@ -228,19 +236,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
@Override @Override
public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) { public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) {
TenantId tenantId;
if (subEventCallback.getTenantIdMSB() == 0 && subEventCallback.getTenantIdLSB() == 0) {
tenantId = TenantId.SYS_TENANT_ID; //TODO: remove after release
} else {
tenantId = TenantId.fromUUID(new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB()));
}
UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB()); UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB());
onSubEventCallback(entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback); onSubEventCallback(tenantId, entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback);
} }
@Override @Override
public void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { public void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
onSubEventCallback(entityId.getId(), seqNumber, entityUpdatesInfo, callback); onSubEventCallback(tenantId, entityId.getId(), seqNumber, entityUpdatesInfo, callback);
} }
public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { private void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo); log.debug("[{}][{}][{}] Processing sub event callback: {}.", tenantId, entityId, seqNumber, entityUpdatesInfo);
entityUpdates.put(entityId, entityUpdatesInfo); entityUpdates.put(entityId, entityUpdatesInfo);
Set<TbSubscription<?>> pendingSubs = null; Set<TbSubscription<?>> pendingSubs = null;
Lock subsLock = getSubsLock(tenantId);
subsLock.lock(); subsLock.lock();
try { try {
TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId); TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId);
@ -257,24 +272,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} }
@Override @Override
public void cancelSubscription(String sessionId, int subscriptionId) { public void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId) {
log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); log.debug("[{}][{}][{}] Going to remove subscription.", tenantId, sessionId, subscriptionId);
SubscriptionModificationResult result = null; SubscriptionModificationResult result = null;
Lock subsLock = getSubsLock(tenantId);
subsLock.lock(); subsLock.lock();
try { try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
if (sessionSubscriptions != null) { if (sessionSubscriptions != null) {
TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId); TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId);
if (subscription != null) { if (subscription != null) {
if (sessionSubscriptions.isEmpty()) { if (sessionSubscriptions.isEmpty()) {
subscriptionsBySessionId.remove(sessionId); subscriptionsBySessionId.remove(sessionId);
} }
result = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); result = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false);
} else { } else {
log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); log.debug("[{}][{}][{}] Subscription not found!", tenantId, sessionId, subscriptionId);
} }
} else { } else {
log.debug("[{}] No session subscriptions found!", sessionId); log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId);
} }
} finally { } finally {
subsLock.unlock(); subsLock.unlock();
@ -285,18 +302,19 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} }
@Override @Override
public void cancelAllSessionSubscriptions(String sessionId) { public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) {
log.debug("[{}] Going to remove session subscriptions.", sessionId); log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId);
List<SubscriptionModificationResult> results = new ArrayList<>(); List<SubscriptionModificationResult> results = new ArrayList<>();
Lock subsLock = getSubsLock(tenantId);
subsLock.lock(); subsLock.lock();
try { try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId);
if (sessionSubscriptions != null) { if (sessionSubscriptions != null) {
for (TbSubscription<?> subscription : sessionSubscriptions.values()) { for (TbSubscription<?> subscription : sessionSubscriptions.values()) {
results.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false)); results.add(modifySubscription(tenantId, subscription.getEntityId(), subscription, false));
} }
} else { } else {
log.debug("[{}] No session subscriptions found!", sessionId); log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId);
} }
} finally { } finally {
subsLock.unlock(); subsLock.unlock();
@ -604,7 +622,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} }
private void cleanupStaleSessions() { private void cleanupStaleSessions() {
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale); subscriptionsBySessionId.forEach((sessionId, subscriptions) ->
subscriptions.values()
.stream()
.findAny()
.ifPresent(subscription -> webSocketService.cleanupIfStale(subscription.getTenantId(), sessionId))
);
} }
private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) { private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) {

View File

@ -113,7 +113,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
public void clearEntitySubscriptions() { public void clearEntitySubscriptions() {
if (subToEntityIdMap != null) { if (subToEntityIdMap != null) {
for (Integer subId : subToEntityIdMap.keySet()) { for (Integer subId : subToEntityIdMap.keySet()) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId);
} }
subToEntityIdMap.clear(); subToEntityIdMap.clear();
} }

View File

@ -303,7 +303,7 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
protected void clearDynamicValueSubscriptions() { protected void clearDynamicValueSubscriptions() {
if (subToDynamicValueKeySet != null) { if (subToDynamicValueKeySet != null) {
for (Integer subId : subToDynamicValueKeySet) { for (Integer subId : subToDynamicValueKeySet) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subId);
} }
subToDynamicValueKeySet.clear(); subToDynamicValueKeySet.clear();
} }

View File

@ -341,7 +341,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
long startTs = System.currentTimeMillis() - query.getPageLink().getTimeWindow(); long startTs = System.currentTimeMillis() - query.getPageLink().getTimeWindow();
newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity)); newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity));
} }
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId));
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
} }

View File

@ -225,7 +225,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
} }
} }
} }
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId));
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef)); subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription)); sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
} }

View File

@ -27,8 +27,6 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* Information about the local websocket subscriptions. * Information about the local websocket subscriptions.
@ -42,8 +40,6 @@ public class TbEntityLocalSubsInfo {
@Getter @Getter
private final EntityId entityId; private final EntityId entityId;
@Getter @Getter
private final Lock lock = new ReentrantLock();
@Getter
private final Set<TbSubscription<?>> subs = ConcurrentHashMap.newKeySet(); private final Set<TbSubscription<?>> subs = ConcurrentHashMap.newKeySet();
private volatile TbSubscriptionsInfo state = new TbSubscriptionsInfo(); private volatile TbSubscriptionsInfo state = new TbSubscriptionsInfo();

View File

@ -34,11 +34,11 @@ public interface TbLocalSubscriptionService {
void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback); void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback);
void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty); void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty);
void cancelSubscription(String sessionId, int subscriptionId); void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId);
void cancelAllSessionSubscriptions(String sessionId); void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId);
void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback); void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback);

View File

@ -82,10 +82,12 @@ public class TbSubscriptionUtils {
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build(); return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build();
} }
public static ToCoreNotificationMsg toProto(UUID id, int seqNumber, TbEntityUpdatesInfo update) { public static ToCoreNotificationMsg toProto(TenantId tenantId, UUID id, int seqNumber, TbEntityUpdatesInfo update) {
TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder() TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder()
.setEntityIdMSB(id.getMostSignificantBits()) .setEntityIdMSB(id.getMostSignificantBits())
.setEntityIdLSB(id.getLeastSignificantBits()) .setEntityIdLSB(id.getLeastSignificantBits())
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setSeqNumber(seqNumber) .setSeqNumber(seqNumber)
.setAttributesUpdateTs(update.attributesUpdateTs) .setAttributesUpdateTs(update.attributesUpdateTs)
.setTimeSeriesUpdateTs(update.timeSeriesUpdateTs); .setTimeSeriesUpdateTs(update.timeSeriesUpdateTs);

View File

@ -193,17 +193,18 @@ public class DefaultWebSocketService implements WebSocketService {
@Override @Override
public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent event) { public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent event) {
String sessionId = sessionRef.getSessionId(); String sessionId = sessionRef.getSessionId();
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
log.debug(PROCESSING_MSG, sessionId, event); log.debug(PROCESSING_MSG, sessionId, event);
switch (event.getEventType()) { switch (event.getEventType()) {
case ESTABLISHED: case ESTABLISHED:
wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef)); wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef));
break; break;
case ERROR: case ERROR:
log.debug("[{}] Unknown websocket session error: ", sessionId, log.debug("[{}][{}] Unknown websocket session error: ", tenantId, sessionId,
event.getError().orElse(new RuntimeException("No error specified"))); event.getError().orElse(new RuntimeException("No error specified")));
break; break;
case CLOSED: case CLOSED:
cleanupSessionById(sessionId); cleanupSessionById(tenantId, sessionId);
processSessionClose(sessionRef); processSessionClose(sessionRef);
break; break;
} }
@ -297,10 +298,10 @@ public class DefaultWebSocketService implements WebSocketService {
} }
@Override @Override
public void cleanupIfStale(String sessionId) { public void cleanupIfStale(TenantId tenantId, String sessionId) {
if (!msgEndpoint.isOpen(sessionId)) { if (!msgEndpoint.isOpen(sessionId)) {
log.info("[{}] Cleaning up stale session ", sessionId); log.info("[{}] Cleaning up stale session ", sessionId);
cleanupSessionById(sessionId); cleanupSessionById(tenantId, sessionId);
} }
} }
@ -754,22 +755,23 @@ public class DefaultWebSocketService implements WebSocketService {
} }
private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
log.warn("[{}][{}] Cleanup session due to empty entity id.", sessionId, cmd.getCmdId()); log.warn("[{}][{}][{}] Cleanup session due to empty entity id.", tenantId, sessionId, cmd.getCmdId());
cleanupSessionById(sessionId); cleanupSessionById(tenantId, sessionId);
} else { } else {
Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId()); Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId());
if (subId == null) { if (subId == null) {
log.trace("[{}][{}] Failed to lookup subscription id mapping", sessionId, cmd.getCmdId()); log.trace("[{}][{}][{}] Failed to lookup subscription id mapping", tenantId, sessionId, cmd.getCmdId());
subId = cmd.getCmdId(); subId = cmd.getCmdId();
} }
oldSubService.cancelSubscription(sessionId, subId); oldSubService.cancelSubscription(tenantId, sessionId, subId);
} }
} }
private void cleanupSessionById(String sessionId) { private void cleanupSessionById(TenantId tenantId, String sessionId) {
wsSessionsMap.remove(sessionId); wsSessionsMap.remove(sessionId);
oldSubService.cancelAllSessionSubscriptions(sessionId); oldSubService.cancelAllSessionSubscriptions(tenantId, sessionId);
sessionCmdMap.remove(sessionId); sessionCmdMap.remove(sessionId);
entityDataSubService.cancelAllSessionSubscriptions(sessionId); entityDataSubService.cancelAllSessionSubscriptions(sessionId);
} }

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.ws; package org.thingsboard.server.service.ws;
import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.CloseStatus;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
@ -37,6 +38,6 @@ public interface WebSocketService {
void close(String sessionId, CloseStatus status); void close(String sessionId, CloseStatus status);
void cleanupIfStale(String sessionId); void cleanupIfStale(TenantId tenantId, String sessionId);
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.ws;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.SecurityUser;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -39,6 +40,10 @@ public class WebSocketSessionRef {
private final WebSocketSessionType sessionType; private final WebSocketSessionType sessionType;
private final AtomicInteger sessionSubIdSeq = new AtomicInteger(); private final AtomicInteger sessionSubIdSeq = new AtomicInteger();
public TenantId getTenantId() {
return securityCtx != null ? securityCtx.getTenantId() : TenantId.SYS_TENANT_ID;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;

View File

@ -249,7 +249,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
@Override @Override
public void handleUnsubCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) { public void handleUnsubCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), cmd.getCmdId()); localSubscriptionService.cancelSubscription(sessionRef.getTenantId(), sessionRef.getSessionId(), cmd.getCmdId());
} }
private void sendUpdate(String sessionId, CmdUpdate update) { private void sendUpdate(String sessionId, CmdUpdate update) {

View File

@ -834,6 +834,8 @@ message TbEntitySubEventCallbackProto {
int32 seqNumber = 3; int32 seqNumber = 3;
int64 attributesUpdateTs = 4; int64 attributesUpdateTs = 4;
int64 timeSeriesUpdateTs = 5; int64 timeSeriesUpdateTs = 5;
int64 tenantIdMSB = 6;
int64 tenantIdLSB = 7;
} }
message TsValueProto { message TsValueProto {