From cd1947663eb76cb8b4bae87505006f2b23d56f17 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 23 Nov 2018 13:17:36 +0200 Subject: [PATCH] Added max size of queue per websocket --- .../controller/plugin/TbWebSocketHandler.java | 31 +++++++++++++------ .../src/main/resources/thingsboard.yml | 1 + 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 82892a9d2b..bfa262a853 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -75,6 +75,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr private int maxSessionsPerRegularUser; @Value("${server.ws.limits.max_sessions_per_public_user:0}") private int maxSessionsPerPublicUser; + @Value("${server.ws.limits.max_queue_per_ws_session:1000}") + private int maxMsgQueuePerSession; @Value("${server.ws.limits.max_updates_per_session:}") private String perSessionUpdatesConfiguration; @@ -108,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr super.afterConnectionEstablished(session); try { if (session instanceof NativeWebSocketSession) { - Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); + Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class); if (nativeSession != null) { nativeSession.getAsyncRemote().setSendTimeout(sendTimeout); } @@ -119,7 +121,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr if (!checkLimits(session, sessionRef)) { return; } - internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef)); + internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, maxMsgQueuePerSession)); externalSessionMap.put(externalSessionId, internalSessionId); processInWebSocketService(sessionRef, SessionEvent.onEstablished()); log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId()); @@ -176,31 +178,40 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr if (!"telemetry".equalsIgnoreCase(serviceToken)) { throw new InvalidParameterException("Can't find plugin with specified token!"); } else { - SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal(); + SecurityUser currentUser = (SecurityUser) ((Authentication) session.getPrincipal()).getPrincipal(); return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress()); } } - private static class SessionMetaData implements SendHandler { + private class SessionMetaData implements SendHandler { private final WebSocketSession session; private final RemoteEndpoint.Async asyncRemote; private final TelemetryWebSocketSessionRef sessionRef; private volatile boolean isSending = false; + private final Queue msgQueue; - private Queue msgQueue = new LinkedBlockingQueue<>(); - - SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) { + SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef, int maxMsgQueuePerSession) { super(); this.session = session; - Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); + Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class); this.asyncRemote = nativeSession.getAsyncRemote(); this.sessionRef = sessionRef; + this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession); } - public synchronized void sendMsg(String msg) { + synchronized void sendMsg(String msg) { if (isSending) { - msgQueue.add(msg); + try { + msgQueue.add(msg); + } catch (RuntimeException e){ + log.trace("[{}] Session closed due to queue error", session.getId(), e); + try { + close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!")); + } catch (IOException ioe) { + log.trace("[{}] Session transport error", session.getId(), ioe); + } + } } else { isSending = true; sendMsgInternal(msg); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 08808f9bdd..0f87429bb2 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -40,6 +40,7 @@ server: max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}" max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}" max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}" + max_queue_per_ws_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION:500}" max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}" max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}" max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"