Added scheduled session cleanup (#10818)

* added scheduled session cleanup for cases when session was closed before last session command processed

* updated logging to include user info, updated isOpen method to include native check

* updated cleanup to go through subscriptionsByEntityId map values

* made modification of subscriptionsBySessionId and subscriptionsByEntityId maps atomic

* fixed cancelAllSessionSubscriptions

* added try-catch for pushSubscriptionEvent, modifySubscription

* refactored logging

* deleted redundant logging

* refactoring: updated isEmpty to hasEvent
This commit is contained in:
Daria Shevchenko 2024-05-17 17:54:23 +03:00 committed by ViacheslavKlimov
parent 811f7d7206
commit a1cbd8815b
6 changed files with 140 additions and 33 deletions

View File

@ -539,6 +539,18 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
} }
@Override
public boolean isOpen(String externalId) {
String internalId = externalSessionMap.get(externalId);
if (internalId != null) {
SessionMetaData sessionMd = getSessionMd(internalId);
if (sessionMd != null) {
return sessionMd.session.isOpen();
}
}
return false;
}
private boolean checkLimits(WebSocketSession session, WebSocketSessionRef sessionRef) throws IOException { private boolean checkLimits(WebSocketSession session, WebSocketSessionRef sessionRef) throws IOException {
var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef); var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef);
if (tenantProfileConfiguration == null) { if (tenantProfileConfiguration == null) {

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
@ -62,6 +63,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -84,18 +87,21 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
private final PartitionService partitionService; private final PartitionService partitionService;
private final TbClusterService clusterService; private final TbClusterService clusterService;
private final SubscriptionManagerService subscriptionManagerService; private final SubscriptionManagerService subscriptionManagerService;
private final WebSocketService webSocketService;
private ExecutorService tsCallBackExecutor; private ExecutorService tsCallBackExecutor;
private ScheduledExecutorService staleSessionCleanupExecutor;
public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider,
PartitionService partitionService, TbClusterService clusterService, PartitionService partitionService, TbClusterService clusterService,
@Lazy SubscriptionManagerService subscriptionManagerService) { @Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService) {
this.attrService = attrService; this.attrService = attrService;
this.tsService = tsService; this.tsService = tsService;
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.partitionService = partitionService; this.partitionService = partitionService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.subscriptionManagerService = subscriptionManagerService; this.subscriptionManagerService = subscriptionManagerService;
this.webSocketService = webSocketService;
} }
private String serviceId; private String serviceId;
@ -108,6 +114,8 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback"));
serviceId = serviceInfoProvider.getServiceId(); serviceId = serviceInfoProvider.getServiceId();
staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup"));
staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS);
} }
@PreDestroy @PreDestroy
@ -118,6 +126,9 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
if (tsCallBackExecutor != null) { if (tsCallBackExecutor != null) {
tsCallBackExecutor.shutdownNow(); tsCallBackExecutor.shutdownNow();
} }
if (staleSessionCleanupExecutor != null) {
staleSessionCleanupExecutor.shutdownNow();
}
} }
@Override @Override
@ -157,9 +168,18 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
TenantId tenantId = subscription.getTenantId(); TenantId tenantId = subscription.getTenantId();
EntityId entityId = subscription.getEntityId(); EntityId entityId = subscription.getEntityId();
log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription);
ModifySubscriptionResult modifySubscriptionResult;
subsLock.lock();
try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>());
sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
modifySubscription(tenantId, entityId, subscription, true); modifySubscriptionResult = modifySubscription(tenantId, entityId, subscription, true);
} finally {
subsLock.unlock();
}
if (modifySubscriptionResult.hasEvent()) {
pushSubscriptionEvent(modifySubscriptionResult);
}
} }
@Override @Override
@ -195,6 +215,9 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
@Override @Override
public void cancelSubscription(String sessionId, int subscriptionId) { public void cancelSubscription(String sessionId, int subscriptionId) {
log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
ModifySubscriptionResult modifySubscriptionResult = null;
subsLock.lock();
try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
if (sessionSubscriptions != null) { if (sessionSubscriptions != null) {
TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId); TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId);
@ -202,30 +225,39 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
if (sessionSubscriptions.isEmpty()) { if (sessionSubscriptions.isEmpty()) {
subscriptionsBySessionId.remove(sessionId); subscriptionsBySessionId.remove(sessionId);
} }
modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); modifySubscriptionResult = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false);
} else { } else {
log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
} }
} else { } else {
log.debug("[{}] No session subscriptions found!", sessionId); log.debug("[{}] No session subscriptions found!", sessionId);
} }
} finally {
subsLock.unlock();
}
if (modifySubscriptionResult != null && modifySubscriptionResult.hasEvent()) {
pushSubscriptionEvent(modifySubscriptionResult);
}
} }
@Override @Override
public void cancelAllSessionSubscriptions(String sessionId) { public void cancelAllSessionSubscriptions(String sessionId) {
log.debug("[{}] Going to remove session subscriptions.", sessionId); log.debug("[{}] Going to remove session subscriptions.", sessionId);
List<ModifySubscriptionResult> result = new ArrayList<>();
subsLock.lock();
try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId);
if (sessionSubscriptions != null) { if (sessionSubscriptions != null) {
for (TbSubscription<?> subscription : sessionSubscriptions.values()) { for (TbSubscription<?> subscription : sessionSubscriptions.values()) {
try { result.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false));
modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false);
} catch (Exception e) {
log.warn("[{}][{}] Failed to remove subscription {} due to ", subscription.getTenantId(), subscription.getEntityId(), subscription, e);
}
} }
} else { } else {
log.debug("[{}] No session subscriptions found!", sessionId); log.debug("[{}] No session subscriptions found!", sessionId);
} }
} finally {
subsLock.unlock();
}
result.stream().filter(ModifySubscriptionResult::hasEvent).forEach(this::pushSubscriptionEvent);
} }
@Override @Override
@ -388,10 +420,9 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
callback.onSuccess(); callback.onSuccess();
} }
private void modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription<?> subscription, boolean add) { private ModifySubscriptionResult modifySubscription(TenantId tenantId, EntityId entityId, TbSubscription<?> subscription, boolean add) {
TbSubscription<?> missedUpdatesCandidate = null; TbSubscription<?> missedUpdatesCandidate = null;
TbEntitySubEvent event; TbEntitySubEvent event = null;
subsLock.lock();
try { try {
TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.computeIfAbsent(entityId.getId(), id -> new TbEntityLocalSubsInfo(tenantId, entityId)); TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.computeIfAbsent(entityId.getId(), id -> new TbEntityLocalSubsInfo(tenantId, entityId));
event = add ? entitySubs.add(subscription) : entitySubs.remove(subscription); event = add ? entitySubs.add(subscription) : entitySubs.remove(subscription);
@ -401,17 +432,23 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} else if (add) { } else if (add) {
missedUpdatesCandidate = entitySubs.registerPendingSubscription(subscription, event); missedUpdatesCandidate = entitySubs.registerPendingSubscription(subscription, event);
} }
} finally { } catch (Exception e) {
subsLock.unlock(); log.warn("[{}][{}] Failed to {} subscription {} due to ", tenantId, entityId, add ? "add" : "remove", subscription, e);
} }
if (event != null) { return new ModifySubscriptionResult(tenantId, entityId, subscription, missedUpdatesCandidate, event);
log.trace("[{}][{}][{}] Event: {}", tenantId, entityId, subscription.getSubscriptionId(), event); }
pushSubEventToManagerService(tenantId, entityId, event);
private void pushSubscriptionEvent(ModifySubscriptionResult modifySubResult) {
try {
TbEntitySubEvent event = modifySubResult.getEvent();
log.trace("[{}][{}][{}] Event: {}", modifySubResult.getTenantId(), modifySubResult.getEntityId(), modifySubResult.getSubscription().getSubscriptionId(), event);
pushSubEventToManagerService(modifySubResult.getTenantId(), modifySubResult.getEntityId(), event);
TbSubscription<?> missedUpdatesCandidate = modifySubResult.getMissedUpdatesCandidate();
if (missedUpdatesCandidate != null) { if (missedUpdatesCandidate != null) {
checkMissedUpdates(missedUpdatesCandidate); checkMissedUpdates(missedUpdatesCandidate);
} }
} else { } catch (Exception e) {
log.trace("[{}][{}][{}] No changes detected.", tenantId, entityId, subscription.getSubscriptionId()); log.warn("[{}][{}] Failed to push subscription event {} due to ", modifySubResult.getTenantId(), modifySubResult.getEntityId(), modifySubResult.getEvent(), e);
} }
} }
@ -522,4 +559,8 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} }
} }
private void cleanupStaleSessions() {
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale);
}
} }

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
/**
* The modification result of entity subscription
*/
@Builder
@Data
public class ModifySubscriptionResult {
private TenantId tenantId;
private EntityId entityId;
private TbSubscription<?> subscription;
private TbSubscription<?> missedUpdatesCandidate;
private TbEntitySubEvent event;
public boolean hasEvent() {
return event != null;
}
}

View File

@ -295,6 +295,16 @@ public class DefaultWebSocketService implements WebSocketService {
} }
} }
@Override
public void cleanupIfStale(String sessionId) {
if (!msgEndpoint.isOpen(sessionId)) {
log.info("[{}] Cleaning up stale session ", sessionId);
wsSessionsMap.remove(sessionId);
oldSubService.cancelAllSessionSubscriptions(sessionId);
entityDataSubService.cancelAllSessionSubscriptions(sessionId);
}
}
private void processSessionClose(WebSocketSessionRef sessionRef) { private void processSessionClose(WebSocketSessionRef sessionRef) {
var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef); var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef);
if (tenantProfileConfiguration != null) { if (tenantProfileConfiguration != null) {

View File

@ -29,4 +29,6 @@ public interface WebSocketMsgEndpoint {
void sendPing(WebSocketSessionRef sessionRef, long currentTime) throws IOException; void sendPing(WebSocketSessionRef sessionRef, long currentTime) throws IOException;
void close(WebSocketSessionRef sessionRef, CloseStatus withReason) throws IOException; void close(WebSocketSessionRef sessionRef, CloseStatus withReason) throws IOException;
boolean isOpen(String sessionId);
} }

View File

@ -36,4 +36,7 @@ public interface WebSocketService {
void sendError(WebSocketSessionRef sessionRef, int subId, SubscriptionErrorCode errorCode, String errorMsg); void sendError(WebSocketSessionRef sessionRef, int subId, SubscriptionErrorCode errorCode, String errorMsg);
void close(String sessionId, CloseStatus status); void close(String sessionId, CloseStatus status);
void cleanupIfStale(String sessionId);
} }