diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 00f3be68c4..87bf110c81 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -52,13 +52,11 @@ import javax.websocket.Session; import java.io.IOException; import java.net.URI; import java.security.InvalidParameterException; -import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; import static org.thingsboard.server.service.ws.DefaultWebSocketService.NUMBER_OF_PING_ATTEMPTS; @@ -139,9 +137,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke if (!checkLimits(session, sessionRef)) { return; } - var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration(); - internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession() > 0 ? - tenantProfileConfiguration.getWsMsgQueueLimitPerSession() : 500)); + internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef)); externalSessionMap.put(externalSessionId, internalSessionId); processInWebSocketService(sessionRef, SessionEvent.onEstablished()); @@ -216,22 +212,21 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke private final RemoteEndpoint.Async asyncRemote; private final WebSocketSessionRef sessionRef; - private final AtomicBoolean isSending = new AtomicBoolean(false); - private final Queue> msgQueue; + // TODO: carefully review ( + discuss removal of the msgQueue) + private final Semaphore sendingSemaphore = new Semaphore(1); private volatile long lastActivityTime; - SessionMetaData(WebSocketSession session, WebSocketSessionRef sessionRef, int maxMsgQueuePerSession) { + SessionMetaData(WebSocketSession session, WebSocketSessionRef sessionRef) { super(); this.session = session; Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class); this.asyncRemote = nativeSession.getAsyncRemote(); this.sessionRef = sessionRef; - this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession); this.lastActivityTime = System.currentTimeMillis(); } - synchronized void sendPing(long currentTime) { + void sendPing(long currentTime) { try { long timeSinceLastActivity = currentTime - lastActivityTime; if (timeSinceLastActivity >= pingTimeout) { @@ -246,40 +241,37 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } } - private void closeSession(CloseStatus reason) { + void closeSession(CloseStatus reason) { try { close(this.sessionRef, reason); } catch (IOException ioe) { log.trace("[{}] Session transport error", session.getId(), ioe); + } finally { + sendingSemaphore.release(); } } - synchronized void processPongMessage(long currentTime) { + void processPongMessage(long currentTime) { lastActivityTime = currentTime; } - synchronized void sendMsg(String msg) { + void sendMsg(String msg) { sendMsg(new TbWebSocketTextMsg(msg)); } - synchronized void sendMsg(TbWebSocketMsg msg) { - if (isSending.compareAndSet(false, true)) { + void sendMsg(TbWebSocketMsg msg) { + try { + sendingSemaphore.acquire(); sendMsgInternal(msg); - } else { - try { - msgQueue.add(msg); - } catch (RuntimeException e) { - if (log.isTraceEnabled()) { - log.trace("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId(), e); - } else { - log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId()); - } - closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!")); - } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (Exception e) { + sendingSemaphore.release(); + throw e; } } - private void sendMsgInternal(TbWebSocketMsg msg) { + void sendMsgInternal(TbWebSocketMsg msg) { try { if (TbWebSocketMsgType.TEXT.equals(msg.getType())) { TbWebSocketTextMsg textMsg = (TbWebSocketTextMsg) msg; @@ -287,7 +279,6 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke } else { TbWebSocketPingMsg pingMsg = (TbWebSocketPingMsg) msg; this.asyncRemote.sendPing(pingMsg.getMsg()); - processNextMsg(); } } catch (Exception e) { log.trace("[{}] Failed to send msg", session.getId(), e); @@ -301,16 +292,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke log.trace("[{}] Failed to send msg", session.getId(), result.getException()); closeSession(CloseStatus.SESSION_NOT_RELIABLE); } else { - processNextMsg(); - } - } - - private void processNextMsg() { - TbWebSocketMsg msg = msgQueue.poll(); - if (msg != null) { - sendMsgInternal(msg); - } else { - isSending.set(false); + sendingSemaphore.release(); } } }