diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 71350bb1fd..69728b20c3 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -539,6 +539,18 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } + @Override + public boolean isOpen(String externalId) { + String internalId = externalSessionMap.get(externalId); + if (internalId != null) { + SessionMetaData sessionMd = getSessionMd(internalId); + if (sessionMd != null) { + return sessionMd.session.isOpen(); + } + } + return false; + } + private boolean checkLimits(WebSocketSession session, WebSocketSessionRef sessionRef) throws IOException { var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef); if (tenantProfileConfiguration == null) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index a280ae6754..d1addf8b0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -45,6 +45,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; @@ -62,6 +63,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -84,18 +87,21 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer private final PartitionService partitionService; private final TbClusterService clusterService; private final SubscriptionManagerService subscriptionManagerService; + private final WebSocketService webSocketService; private ExecutorService tsCallBackExecutor; + private ScheduledExecutorService staleSessionCleanupExecutor; public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService, TbClusterService clusterService, - @Lazy SubscriptionManagerService subscriptionManagerService) { + @Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService) { this.attrService = attrService; this.tsService = tsService; this.serviceInfoProvider = serviceInfoProvider; this.partitionService = partitionService; this.clusterService = clusterService; this.subscriptionManagerService = subscriptionManagerService; + this.webSocketService = webSocketService; } private String serviceId; @@ -108,6 +114,8 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); serviceId = serviceInfoProvider.getServiceId(); + staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup")); + staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS); } @PreDestroy @@ -118,6 +126,9 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer if (tsCallBackExecutor != null) { tsCallBackExecutor.shutdownNow(); } + if (staleSessionCleanupExecutor != null) { + staleSessionCleanupExecutor.shutdownNow(); + } } @Override @@ -157,9 +168,18 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer TenantId tenantId = subscription.getTenantId(); EntityId entityId = subscription.getEntityId(); log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); - Map> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); - sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); - modifySubscription(tenantId, entityId, subscription, true); + ModifySubscriptionResult modifySubscriptionResult; + subsLock.lock(); + try { + Map> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); + sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); + modifySubscriptionResult = modifySubscription(tenantId, entityId, subscription, true); + } finally { + subsLock.unlock(); + } + if (modifySubscriptionResult.hasEvent()) { + pushSubscriptionEvent(modifySubscriptionResult); + } } @Override @@ -195,37 +215,49 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void cancelSubscription(String sessionId, int subscriptionId) { log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); - Map> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); - if (sessionSubscriptions != null) { - TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); - if (subscription != null) { - if (sessionSubscriptions.isEmpty()) { - subscriptionsBySessionId.remove(sessionId); + ModifySubscriptionResult modifySubscriptionResult = null; + subsLock.lock(); + try { + Map> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); + if (sessionSubscriptions != null) { + TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); + if (subscription != null) { + if (sessionSubscriptions.isEmpty()) { + subscriptionsBySessionId.remove(sessionId); + } + modifySubscriptionResult = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); + } else { + log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); } - modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); } else { - log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); + log.debug("[{}] No session subscriptions found!", sessionId); } - } else { - log.debug("[{}] No session subscriptions found!", sessionId); + } finally { + subsLock.unlock(); + } + if (modifySubscriptionResult != null && modifySubscriptionResult.hasEvent()) { + pushSubscriptionEvent(modifySubscriptionResult); } } @Override public void cancelAllSessionSubscriptions(String sessionId) { log.debug("[{}] Going to remove session subscriptions.", sessionId); - Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); - if (sessionSubscriptions != null) { - for (TbSubscription subscription : sessionSubscriptions.values()) { - try { - modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); - } catch (Exception e) { - log.warn("[{}][{}] Failed to remove subscription {} due to ", subscription.getTenantId(), subscription.getEntityId(), subscription, e); + List result = new ArrayList<>(); + subsLock.lock(); + try { + Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); + if (sessionSubscriptions != null) { + for (TbSubscription subscription : sessionSubscriptions.values()) { + result.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false)); } + } else { + log.debug("[{}] No session subscriptions found!", sessionId); } - } else { - log.debug("[{}] No session subscriptions found!", sessionId); + } finally { + subsLock.unlock(); } + result.stream().filter(ModifySubscriptionResult::hasEvent).forEach(this::pushSubscriptionEvent); } @Override @@ -388,10 +420,9 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer callback.onSuccess(); } - private void modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription subscription, boolean add) { + private ModifySubscriptionResult modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription subscription, boolean add) { TbSubscription missedUpdatesCandidate = null; - TbEntitySubEvent event; - subsLock.lock(); + TbEntitySubEvent event = null; try { TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.computeIfAbsent(entityId.getId(), id -> new TbEntityLocalSubsInfo(tenantId, entityId)); event = add ? entitySubs.add(subscription) : entitySubs.remove(subscription); @@ -401,17 +432,23 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } else if (add) { missedUpdatesCandidate = entitySubs.registerPendingSubscription(subscription, event); } - } finally { - subsLock.unlock(); + } catch (Exception e) { + log.warn("[{}][{}] Failed to {} subscription {} due to ", tenantId, entityId, add ? "add" : "remove", subscription, e); } - if (event != null) { - log.trace("[{}][{}][{}] Event: {}", tenantId, entityId, subscription.getSubscriptionId(), event); - pushSubEventToManagerService(tenantId, entityId, event); + return new ModifySubscriptionResult(tenantId, entityId, subscription, missedUpdatesCandidate, event); + } + + private void pushSubscriptionEvent(ModifySubscriptionResult modifySubResult) { + try { + TbEntitySubEvent event = modifySubResult.getEvent(); + log.trace("[{}][{}][{}] Event: {}", modifySubResult.getTenantId(), modifySubResult.getEntityId(), modifySubResult.getSubscription().getSubscriptionId(), event); + pushSubEventToManagerService(modifySubResult.getTenantId(), modifySubResult.getEntityId(), event); + TbSubscription missedUpdatesCandidate = modifySubResult.getMissedUpdatesCandidate(); if (missedUpdatesCandidate != null) { checkMissedUpdates(missedUpdatesCandidate); } - } else { - log.trace("[{}][{}][{}] No changes detected.", tenantId, entityId, subscription.getSubscriptionId()); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push subscription event {} due to ", modifySubResult.getTenantId(), modifySubResult.getEntityId(), modifySubResult.getEvent(), e); } } @@ -522,4 +559,8 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } } + private void cleanupStaleSessions() { + subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/ModifySubscriptionResult.java b/application/src/main/java/org/thingsboard/server/service/subscription/ModifySubscriptionResult.java new file mode 100644 index 0000000000..483c250065 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/ModifySubscriptionResult.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +/** + * The modification result of entity subscription + */ +@Builder +@Data +public class ModifySubscriptionResult { + + private TenantId tenantId; + private EntityId entityId; + private TbSubscription subscription; + private TbSubscription missedUpdatesCandidate; + private TbEntitySubEvent event; + + public boolean hasEvent() { + return event != null; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 78b78d8b8c..2a53e9a772 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -295,6 +295,16 @@ public class DefaultWebSocketService implements WebSocketService { } } + @Override + public void cleanupIfStale(String sessionId) { + if (!msgEndpoint.isOpen(sessionId)) { + log.info("[{}] Cleaning up stale session ", sessionId); + wsSessionsMap.remove(sessionId); + oldSubService.cancelAllSessionSubscriptions(sessionId); + entityDataSubService.cancelAllSessionSubscriptions(sessionId); + } + } + private void processSessionClose(WebSocketSessionRef sessionRef) { var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef); if (tenantProfileConfiguration != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketMsgEndpoint.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketMsgEndpoint.java index 41a68c98ca..416af080f8 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketMsgEndpoint.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketMsgEndpoint.java @@ -29,4 +29,6 @@ public interface WebSocketMsgEndpoint { void sendPing(WebSocketSessionRef sessionRef, long currentTime) throws IOException; void close(WebSocketSessionRef sessionRef, CloseStatus withReason) throws IOException; + + boolean isOpen(String sessionId); } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java index 8f586f264e..284431ff84 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java @@ -36,4 +36,7 @@ public interface WebSocketService { void sendError(WebSocketSessionRef sessionRef, int subId, SubscriptionErrorCode errorCode, String errorMsg); void close(String sessionId, CloseStatus status); + + void cleanupIfStale(String sessionId); + }