From da5679d498108f81a58a74b60ec87e9c0aae04f3 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 22 Nov 2018 14:41:33 +0200 Subject: [PATCH] Add async websocket send support. --- .../controller/plugin/TbWebSocketHandler.java | 64 ++++++++++++++----- .../src/main/resources/thingsboard.yml | 2 +- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index c8b4553415..82892a9d2b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -16,7 +16,6 @@ package org.thingsboard.server.controller.plugin; import lombok.extern.slf4j.Slf4j; -import org.apache.tomcat.websocket.Constants; import org.springframework.beans.factory.BeanCreationNotAllowedException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -41,14 +40,19 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import javax.websocket.RemoteEndpoint; +import javax.websocket.SendHandler; +import javax.websocket.SendResult; import javax.websocket.Session; import java.io.IOException; import java.net.URI; import java.security.InvalidParameterException; +import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; @Service @Slf4j @@ -60,8 +64,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @Autowired private TelemetryWebSocketService webSocketService; - @Value("${server.ws.blocking_send_timeout:5000}") - private long blockingSendTimeout; + @Value("${server.ws.send_timeout:5000}") + private long sendTimeout; @Value("${server.ws.limits.max_sessions_per_tenant:0}") private int maxSessionsPerTenant; @@ -106,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr if (session instanceof NativeWebSocketSession) { Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); if (nativeSession != null) { - nativeSession.getUserProperties().put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, new Long(blockingSendTimeout)); + nativeSession.getAsyncRemote().setSendTimeout(sendTimeout); } } String internalSessionId = session.getId(); @@ -177,15 +181,52 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } } - private static class SessionMetaData { + private static class SessionMetaData implements SendHandler { private final WebSocketSession session; + private final RemoteEndpoint.Async asyncRemote; private final TelemetryWebSocketSessionRef sessionRef; + private volatile boolean isSending = false; + + private Queue msgQueue = new LinkedBlockingQueue<>(); + SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) { super(); this.session = session; + Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); + this.asyncRemote = nativeSession.getAsyncRemote(); this.sessionRef = sessionRef; } + + public synchronized void sendMsg(String msg) { + if (isSending) { + msgQueue.add(msg); + } else { + isSending = true; + sendMsgInternal(msg); + } + } + + private void sendMsgInternal(String msg) { + try { + this.asyncRemote.sendText(msg, this); + } catch (Exception e) { + log.error("[{}] Failed to send msg", session.getId(), e); + } + } + + @Override + public void onResult(SendResult result) { + if (!result.isOK()) { + log.error("[{}] Failed to send msg", session.getId(), result.getException()); + } + String msg = msgQueue.poll(); + if (msg != null) { + sendMsgInternal(msg); + } else { + isSending = false; + } + } } @Override @@ -202,9 +243,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) { log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached" , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId); - synchronized (sessionMd) { - sessionMd.session.sendMessage(new TextMessage("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}")); - } + sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}"); } return; } else { @@ -212,14 +251,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr blacklistedSessions.remove(externalId); } } - synchronized (sessionMd) { - long start = System.currentTimeMillis(); - sessionMd.session.sendMessage(new TextMessage(msg)); - long took = System.currentTimeMillis() - start; - if (took >= 1000) { - log.info("[{}][{}] Sending message took more than 1 second [{}ms] {}", sessionRef.getSecurityCtx().getTenantId(), externalId, took, msg); - } - } + sessionMd.sendMsg(msg); } else { log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e98ea17b05..08808f9bdd 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -33,7 +33,7 @@ server: key-alias: "${SSL_KEY_ALIAS:tomcat}" log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}" ws: - blocking_send_timeout: "${TB_SERVER_WS_BLOCKING_SEND_TIMEOUT:5000}" + send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}" limits: # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"