Fix cleanup of websocket session in some corner cases

This commit is contained in:
Andrii Shvaika 2022-08-02 10:07:02 +03:00
parent 348dbf648f
commit bcc922e07e
3 changed files with 54 additions and 13 deletions

View File

@ -143,7 +143,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
externalSessionMap.put(externalSessionId, internalSessionId);
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId());
log.info("[{}][{}][{}] Session is opened from address: {}", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId(), session.getRemoteAddress());
} catch (InvalidParameterException e) {
log.warn("[{}] Failed to start session", session.getId(), e);
session.close(CloseStatus.BAD_DATA.withReason(e.getMessage()));
@ -173,8 +173,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
cleanupLimits(session, sessionMd.sessionRef);
externalSessionMap.remove(sessionMd.sessionRef.getSessionId());
processInWebSocketService(sessionMd.sessionRef, SessionEvent.onClosed());
log.info("[{}][{}][{}] Session is closed", sessionMd.sessionRef.getSecurityCtx().getTenantId(), sessionMd.sessionRef.getSessionId(), session.getId());
} else {
log.info("[{}] Session is closed", session.getId());
}
log.info("[{}] Session is closed", session.getId());
}
private void processInWebSocketService(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {

View File

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
@ -294,25 +295,53 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
if (adq.getPageLink().getTimeWindow() > 0) {
TbAlarmDataSubCtx finalCtx = ctx;
ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay(
finalCtx::checkAndResetInvocationCounter, dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
() -> refreshAlarmQuery(finalCtx), dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
finalCtx.setRefreshTask(task);
}
}
}
private void refreshDynamicQuery(TbAbstractSubCtx finalCtx) {
private boolean validate(TbAbstractSubCtx<?> finalCtx) {
if (finalCtx.isStopped()) {
log.warn("[{}][{}][{}] Received validation task for already stopped context.", finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId());
return false;
}
var cmdMap = subscriptionsBySessionId.get(finalCtx.getSessionId());
if (cmdMap == null) {
log.warn("[{}][{}][{}] Received validation task for already removed session.", finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId());
return false;
} else if (!cmdMap.containsKey(finalCtx.getCmdId())) {
log.warn("[{}][{}][{}] Received validation task for unregistered cmdId.", finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId());
return false;
}
return true;
}
private void refreshDynamicQuery(TbAbstractSubCtx<?> finalCtx) {
try {
long start = System.currentTimeMillis();
finalCtx.update();
long end = System.currentTimeMillis();
log.trace("[{}][{}] Executing query: {}", finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery());
stats.getDynamicQueryInvocationCnt().incrementAndGet();
stats.getDynamicQueryTimeSpent().addAndGet(end - start);
if (validate(finalCtx)) {
long start = System.currentTimeMillis();
finalCtx.update();
long end = System.currentTimeMillis();
log.trace("[{}][{}] Executing query: {}", finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery());
stats.getDynamicQueryInvocationCnt().incrementAndGet();
stats.getDynamicQueryTimeSpent().addAndGet(end - start);
} else {
finalCtx.stop();
}
} catch (Exception e) {
log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e);
}
}
private void refreshAlarmQuery(TbAlarmDataSubCtx finalCtx) {
if (validate(finalCtx)) {
finalCtx.checkAndResetInvocationCounter();
} else {
finalCtx.stop();
}
}
@Scheduled(fixedDelayString = "${server.ws.dynamic_page_link.stats:10000}")
public void printStats() {
int alarmQueryInvocationCntValue = stats.getAlarmQueryInvocationCnt().getAndSet(0);
@ -526,8 +555,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private void cleanupAndCancel(TbAbstractSubCtx ctx) {
if (ctx != null) {
ctx.cancelTasks();
ctx.clearSubscriptions();
ctx.stop();
if (ctx.getSessionId() != null) {
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.get(ctx.getSessionId());
if (sessionSubs != null) {

View File

@ -78,6 +78,7 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
protected T query;
@Setter
protected volatile ScheduledFuture<?> refreshTask;
protected volatile boolean stopped;
public TbAbstractSubCtx(String serviceId, TelemetryWebSocketService wsService,
EntityService entityService, TbLocalSubscriptionService localSubscriptionService,
@ -189,6 +190,12 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
clearDynamicValueSubscriptions();
}
public void stop() {
stopped = true;
cancelTasks();
clearSubscriptions();
}
@Data
private static class DynamicValueKeySub {
private final DynamicValueKey key;
@ -299,7 +306,11 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
}
public void setRefreshTask(ScheduledFuture<?> task) {
this.refreshTask = task;
if (!stopped) {
this.refreshTask = task;
} else {
task.cancel(true);
}
}
public void cancelTasks() {