From bcc922e07e50a727f30c5c7ef20fa099c1db70d6 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 2 Aug 2022 10:07:02 +0300 Subject: [PATCH] Fix cleanup of websocket session in some corner cases --- .../controller/plugin/TbWebSocketHandler.java | 6 ++- ...efaultTbEntityDataSubscriptionService.java | 48 +++++++++++++++---- .../subscription/TbAbstractSubCtx.java | 13 ++++- 3 files changed, 54 insertions(+), 13 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 578bfa5662..3565af1ab0 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -143,7 +143,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr externalSessionMap.put(externalSessionId, internalSessionId); processInWebSocketService(sessionRef, SessionEvent.onEstablished()); - log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId()); + log.info("[{}][{}][{}] Session is opened from address: {}", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId(), session.getRemoteAddress()); } catch (InvalidParameterException e) { log.warn("[{}] Failed to start session", session.getId(), e); session.close(CloseStatus.BAD_DATA.withReason(e.getMessage())); @@ -173,8 +173,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr cleanupLimits(session, sessionMd.sessionRef); externalSessionMap.remove(sessionMd.sessionRef.getSessionId()); processInWebSocketService(sessionMd.sessionRef, SessionEvent.onClosed()); + log.info("[{}][{}][{}] Session is closed", sessionMd.sessionRef.getSecurityCtx().getTenantId(), sessionMd.sessionRef.getSessionId(), session.getId()); + } else { + log.info("[{}] Session is closed", session.getId()); } - log.info("[{}] Session is closed", session.getId()); } private void processInWebSocketService(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 507ddb3834..adb816a0ba 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; @@ -294,25 +295,53 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc if (adq.getPageLink().getTimeWindow() > 0) { TbAlarmDataSubCtx finalCtx = ctx; ScheduledFuture task = scheduler.scheduleWithFixedDelay( - finalCtx::checkAndResetInvocationCounter, dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); + () -> refreshAlarmQuery(finalCtx), dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); finalCtx.setRefreshTask(task); } } } - private void refreshDynamicQuery(TbAbstractSubCtx finalCtx) { + private boolean validate(TbAbstractSubCtx finalCtx) { + if (finalCtx.isStopped()) { + log.warn("[{}][{}][{}] Received validation task for already stopped context.", finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId()); + return false; + } + var cmdMap = subscriptionsBySessionId.get(finalCtx.getSessionId()); + if (cmdMap == null) { + log.warn("[{}][{}][{}] Received validation task for already removed session.", finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId()); + return false; + } else if (!cmdMap.containsKey(finalCtx.getCmdId())) { + log.warn("[{}][{}][{}] Received validation task for unregistered cmdId.", finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId()); + return false; + } + return true; + } + + private void refreshDynamicQuery(TbAbstractSubCtx finalCtx) { try { - long start = System.currentTimeMillis(); - finalCtx.update(); - long end = System.currentTimeMillis(); - log.trace("[{}][{}] Executing query: {}", finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery()); - stats.getDynamicQueryInvocationCnt().incrementAndGet(); - stats.getDynamicQueryTimeSpent().addAndGet(end - start); + if (validate(finalCtx)) { + long start = System.currentTimeMillis(); + finalCtx.update(); + long end = System.currentTimeMillis(); + log.trace("[{}][{}] Executing query: {}", finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery()); + stats.getDynamicQueryInvocationCnt().incrementAndGet(); + stats.getDynamicQueryTimeSpent().addAndGet(end - start); + } else { + finalCtx.stop(); + } } catch (Exception e) { log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e); } } + private void refreshAlarmQuery(TbAlarmDataSubCtx finalCtx) { + if (validate(finalCtx)) { + finalCtx.checkAndResetInvocationCounter(); + } else { + finalCtx.stop(); + } + } + @Scheduled(fixedDelayString = "${server.ws.dynamic_page_link.stats:10000}") public void printStats() { int alarmQueryInvocationCntValue = stats.getAlarmQueryInvocationCnt().getAndSet(0); @@ -526,8 +555,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private void cleanupAndCancel(TbAbstractSubCtx ctx) { if (ctx != null) { - ctx.cancelTasks(); - ctx.clearSubscriptions(); + ctx.stop(); if (ctx.getSessionId() != null) { Map sessionSubs = subscriptionsBySessionId.get(ctx.getSessionId()); if (sessionSubs != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java index 15e7f754c5..20e81f0662 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java @@ -78,6 +78,7 @@ public abstract class TbAbstractSubCtx { protected T query; @Setter protected volatile ScheduledFuture refreshTask; + protected volatile boolean stopped; public TbAbstractSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService, @@ -189,6 +190,12 @@ public abstract class TbAbstractSubCtx { clearDynamicValueSubscriptions(); } + public void stop() { + stopped = true; + cancelTasks(); + clearSubscriptions(); + } + @Data private static class DynamicValueKeySub { private final DynamicValueKey key; @@ -299,7 +306,11 @@ public abstract class TbAbstractSubCtx { } public void setRefreshTask(ScheduledFuture task) { - this.refreshTask = task; + if (!stopped) { + this.refreshTask = task; + } else { + task.cancel(true); + } } public void cancelTasks() {