Merge pull request #9815 from thingsboard/websocket-improvements
Improvements for WS services
This commit is contained in:
commit
6be61ed821
@ -59,6 +59,7 @@ import org.thingsboard.server.service.ws.WsCommandsWrapper;
|
|||||||
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper;
|
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper;
|
||||||
import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryCmdsWrapper;
|
import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryCmdsWrapper;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
import javax.websocket.RemoteEndpoint;
|
import javax.websocket.RemoteEndpoint;
|
||||||
import javax.websocket.SendHandler;
|
import javax.websocket.SendHandler;
|
||||||
import javax.websocket.SendResult;
|
import javax.websocket.SendResult;
|
||||||
@ -104,6 +105,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
private long pingTimeout;
|
private long pingTimeout;
|
||||||
@Value("${server.ws.max_queue_messages_per_session:1000}")
|
@Value("${server.ws.max_queue_messages_per_session:1000}")
|
||||||
private int wsMaxQueueMessagesPerSession;
|
private int wsMaxQueueMessagesPerSession;
|
||||||
|
@Value("${server.ws.auth_timeout_ms:10000}")
|
||||||
|
private int authTimeoutMs;
|
||||||
|
|
||||||
private final ConcurrentMap<String, WebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, WebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@ -112,8 +115,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
private final ConcurrentMap<UserId, Set<String>> regularUserSessionsMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<UserId, Set<String>> regularUserSessionsMap = new ConcurrentHashMap<>();
|
||||||
private final ConcurrentMap<UserId, Set<String>> publicUserSessionsMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<UserId, Set<String>> publicUserSessionsMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final Cache<String, SessionMetaData> pendingSessions = Caffeine.newBuilder()
|
private Cache<String, SessionMetaData> pendingSessions;
|
||||||
.expireAfterWrite(10, TimeUnit.SECONDS)
|
|
||||||
|
@PostConstruct
|
||||||
|
private void init() {
|
||||||
|
pendingSessions = Caffeine.newBuilder()
|
||||||
|
.expireAfterWrite(authTimeoutMs, TimeUnit.MILLISECONDS)
|
||||||
.<String, SessionMetaData>removalListener((sessionId, sessionMd, removalCause) -> {
|
.<String, SessionMetaData>removalListener((sessionId, sessionMd, removalCause) -> {
|
||||||
if (removalCause == RemovalCause.EXPIRED && sessionMd != null) {
|
if (removalCause == RemovalCause.EXPIRED && sessionMd != null) {
|
||||||
try {
|
try {
|
||||||
@ -124,6 +131,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleTextMessage(WebSocketSession session, TextMessage message) {
|
public void handleTextMessage(WebSocketSession session, TextMessage message) {
|
||||||
@ -134,8 +142,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!"));
|
session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String msg = message.getPayload();
|
sessionMd.onMsg(message.getPayload());
|
||||||
sessionMd.onMsg(msg);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.warn("IO error", e);
|
log.warn("IO error", e);
|
||||||
}
|
}
|
||||||
@ -159,7 +166,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
|
log.debug("{} Failed to decode subscription cmd: {}", sessionRef.toString(), e.getMessage(), e);
|
||||||
if (sessionRef.getSecurityCtx() != null) {
|
if (sessionRef.getSecurityCtx() != null) {
|
||||||
webSocketService.sendError(sessionRef, 1, SubscriptionErrorCode.BAD_REQUEST, "Failed to parse the payload");
|
webSocketService.sendError(sessionRef, 1, SubscriptionErrorCode.BAD_REQUEST, "Failed to parse the payload");
|
||||||
} else {
|
} else {
|
||||||
@ -169,7 +176,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (sessionRef.getSecurityCtx() != null) {
|
if (sessionRef.getSecurityCtx() != null) {
|
||||||
log.trace("[{}][{}] Processing {}", sessionRef.getSecurityCtx().getTenantId(), sessionMd.session.getId(), msg);
|
log.trace("{} Processing {}", sessionRef.toString(), msg);
|
||||||
webSocketService.handleCommands(sessionRef, cmdsWrapper);
|
webSocketService.handleCommands(sessionRef, cmdsWrapper);
|
||||||
} else {
|
} else {
|
||||||
AuthCmd authCmd = cmdsWrapper.getAuthCmd();
|
AuthCmd authCmd = cmdsWrapper.getAuthCmd();
|
||||||
@ -177,7 +184,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Auth cmd is missing"));
|
close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Auth cmd is missing"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
log.trace("[{}] Authenticating session", sessionMd.session.getId());
|
log.trace("{} Authenticating session", sessionRef.toString());
|
||||||
SecurityUser securityCtx;
|
SecurityUser securityCtx;
|
||||||
try {
|
try {
|
||||||
securityCtx = authenticationProvider.authenticate(authCmd.getToken());
|
securityCtx = authenticationProvider.authenticate(authCmd.getToken());
|
||||||
@ -188,6 +195,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
sessionRef.setSecurityCtx(securityCtx);
|
sessionRef.setSecurityCtx(securityCtx);
|
||||||
pendingSessions.invalidate(sessionMd.session.getId());
|
pendingSessions.invalidate(sessionMd.session.getId());
|
||||||
establishSession(sessionMd.session, sessionRef, sessionMd);
|
establishSession(sessionMd.session, sessionRef, sessionMd);
|
||||||
|
|
||||||
webSocketService.handleCommands(sessionRef, cmdsWrapper);
|
webSocketService.handleCommands(sessionRef, cmdsWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -197,7 +205,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
try {
|
try {
|
||||||
SessionMetaData sessionMd = getSessionMd(session.getId());
|
SessionMetaData sessionMd = getSessionMd(session.getId());
|
||||||
if (sessionMd != null) {
|
if (sessionMd != null) {
|
||||||
log.trace("[{}][{}] Processing pong response {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message.getPayload());
|
log.trace("{} Processing pong response {}", sessionMd.sessionRef.toString(), message.getPayload());
|
||||||
sessionMd.processPongMessage(System.currentTimeMillis());
|
sessionMd.processPongMessage(System.currentTimeMillis());
|
||||||
} else {
|
} else {
|
||||||
log.trace("[{}] Failed to find session", session.getId());
|
log.trace("[{}] Failed to find session", session.getId());
|
||||||
@ -247,7 +255,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
internalSessionMap.put(session.getId(), sessionMd);
|
internalSessionMap.put(session.getId(), sessionMd);
|
||||||
externalSessionMap.put(sessionRef.getSessionId(), session.getId());
|
externalSessionMap.put(sessionRef.getSessionId(), session.getId());
|
||||||
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
|
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
|
||||||
log.info("[{}][{}][{}] Session established from address: {}", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSessionId(), session.getId(), session.getRemoteAddress());
|
log.info("[{}][{}][{}][{}] Session established from address: {}", sessionRef.getSecurityCtx().getTenantId(),
|
||||||
|
sessionRef.getSecurityCtx().getId(), sessionRef.getSessionId(), session.getId(), session.getRemoteAddress());
|
||||||
} else {
|
} else {
|
||||||
sessionMd = new SessionMetaData(session, sessionRef);
|
sessionMd = new SessionMetaData(session, sessionRef);
|
||||||
pendingSessions.put(session.getId(), sessionMd);
|
pendingSessions.put(session.getId(), sessionMd);
|
||||||
@ -280,7 +289,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
cleanupLimits(session, sessionMd.sessionRef);
|
cleanupLimits(session, sessionMd.sessionRef);
|
||||||
processInWebSocketService(sessionMd.sessionRef, SessionEvent.onClosed());
|
processInWebSocketService(sessionMd.sessionRef, SessionEvent.onClosed());
|
||||||
}
|
}
|
||||||
log.info("[{}][{}][{}] Session is closed", sessionMd.sessionRef.getSecurityCtx().getTenantId(), sessionMd.sessionRef.getSessionId(), session.getId());
|
log.info("{} Session is closed", sessionMd.sessionRef.toString());
|
||||||
} else {
|
} else {
|
||||||
log.info("[{}] Session is closed", session.getId());
|
log.info("[{}] Session is closed", session.getId());
|
||||||
}
|
}
|
||||||
@ -293,7 +302,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
try {
|
try {
|
||||||
webSocketService.handleSessionEvent(sessionRef, event);
|
webSocketService.handleSessionEvent(sessionRef, event);
|
||||||
} catch (BeanCreationNotAllowedException e) {
|
} catch (BeanCreationNotAllowedException e) {
|
||||||
log.warn("[{}] Failed to close session due to possible shutdown state", sessionRef.getSessionId());
|
log.warn("{} Failed to close session due to possible shutdown state", sessionRef.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -359,13 +368,13 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
try {
|
try {
|
||||||
long timeSinceLastActivity = currentTime - lastActivityTime;
|
long timeSinceLastActivity = currentTime - lastActivityTime;
|
||||||
if (timeSinceLastActivity >= pingTimeout) {
|
if (timeSinceLastActivity >= pingTimeout) {
|
||||||
log.warn("[{}] Closing session due to ping timeout", session.getId());
|
log.warn("{} Closing session due to ping timeout", sessionRef.toString());
|
||||||
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
} else if (timeSinceLastActivity >= pingTimeout / NUMBER_OF_PING_ATTEMPTS) {
|
} else if (timeSinceLastActivity >= pingTimeout / NUMBER_OF_PING_ATTEMPTS) {
|
||||||
sendMsg(TbWebSocketPingMsg.INSTANCE);
|
sendMsg(TbWebSocketPingMsg.INSTANCE);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.trace("[{}] Failed to send ping msg", session.getId(), e);
|
log.trace("{} Failed to send ping msg", sessionRef.toString(), e);
|
||||||
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -374,7 +383,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
try {
|
try {
|
||||||
close(this.sessionRef, reason);
|
close(this.sessionRef, reason);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
log.trace("[{}] Session transport error", session.getId(), ioe);
|
log.trace("{} Session transport error", sessionRef.toString(), ioe);
|
||||||
} finally {
|
} finally {
|
||||||
outboundMsgQueue.clear();
|
outboundMsgQueue.clear();
|
||||||
}
|
}
|
||||||
@ -394,7 +403,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
outboundMsgQueueSize.incrementAndGet();
|
outboundMsgQueueSize.incrementAndGet();
|
||||||
processNextMsg();
|
processNextMsg();
|
||||||
} else {
|
} else {
|
||||||
log.info("[{}][{}] Session closed due to updates queue size exceeded", sessionRef.getSecurityCtx().getTenantId(), session.getId());
|
log.info("{} Session closed due to updates queue size exceeded", sessionRef.toString());
|
||||||
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
|
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -412,7 +421,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
processNextMsg();
|
processNextMsg();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.trace("[{}] Failed to send msg", session.getId(), e);
|
log.trace("{} Failed to send msg", sessionRef.toString(), e);
|
||||||
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -420,7 +429,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
@Override
|
@Override
|
||||||
public void onResult(SendResult result) {
|
public void onResult(SendResult result) {
|
||||||
if (!result.isOK()) {
|
if (!result.isOK()) {
|
||||||
log.trace("[{}] Failed to send msg", session.getId(), result.getException());
|
log.trace("{} Failed to send msg", sessionRef.toString(), result.getException());
|
||||||
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -467,8 +476,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(WebSocketSessionRef sessionRef, int subscriptionId, String msg) throws IOException {
|
public void send(WebSocketSessionRef sessionRef, int subscriptionId, String msg) throws IOException {
|
||||||
|
log.debug("{} Sending {}", sessionRef.toString(), msg);
|
||||||
String externalId = sessionRef.getSessionId();
|
String externalId = sessionRef.getSessionId();
|
||||||
log.debug("[{}] Sending {}", externalId, msg);
|
|
||||||
String internalId = externalSessionMap.get(externalId);
|
String internalId = externalSessionMap.get(externalId);
|
||||||
if (internalId != null) {
|
if (internalId != null) {
|
||||||
SessionMetaData sessionMd = internalSessionMap.get(internalId);
|
SessionMetaData sessionMd = internalSessionMap.get(internalId);
|
||||||
@ -476,13 +485,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
|
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
|
||||||
if (!rateLimitService.checkRateLimit(LimitedApi.WS_UPDATES_PER_SESSION, tenantId, (Object) sessionRef.getSessionId())) {
|
if (!rateLimitService.checkRateLimit(LimitedApi.WS_UPDATES_PER_SESSION, tenantId, (Object) sessionRef.getSessionId())) {
|
||||||
if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
|
if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
|
||||||
log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
|
log.info("{} Failed to process session update. Max session updates limit reached", sessionRef.toString());
|
||||||
, tenantId, sessionRef.getSecurityCtx().getId(), externalId);
|
|
||||||
sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}");
|
sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}][{}][{}] Session is no longer blacklisted.", tenantId, sessionRef.getSecurityCtx().getId(), externalId);
|
log.debug("{} Session is no longer blacklisted.", sessionRef.toString());
|
||||||
blacklistedSessions.remove(externalId);
|
blacklistedSessions.remove(externalId);
|
||||||
}
|
}
|
||||||
sessionMd.sendMsg(msg);
|
sessionMd.sendMsg(msg);
|
||||||
@ -513,7 +521,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
@Override
|
@Override
|
||||||
public void close(WebSocketSessionRef sessionRef, CloseStatus reason) throws IOException {
|
public void close(WebSocketSessionRef sessionRef, CloseStatus reason) throws IOException {
|
||||||
String externalId = sessionRef.getSessionId();
|
String externalId = sessionRef.getSessionId();
|
||||||
log.debug("[{}] Processing close request", externalId);
|
log.debug("{} Processing close request", sessionRef.toString());
|
||||||
String internalId = externalSessionMap.get(externalId);
|
String internalId = externalSessionMap.get(externalId);
|
||||||
if (internalId != null) {
|
if (internalId != null) {
|
||||||
SessionMetaData sessionMd = getSessionMd(internalId);
|
SessionMetaData sessionMd = getSessionMd(internalId);
|
||||||
@ -543,8 +551,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached"
|
log.info("{} Failed to start session. Max tenant sessions limit reached", sessionRef.toString());
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -560,8 +567,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached"
|
log.info("{} Failed to start session. Max customer sessions limit reached", sessionRef.toString());
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -576,8 +582,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached"
|
log.info("{} Failed to start session. Max regular user sessions limit reached", sessionRef.toString());
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -592,8 +597,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!limitAllowed) {
|
if (!limitAllowed) {
|
||||||
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached"
|
log.info("{} Failed to start session. Max public user sessions limit reached", sessionRef.toString());
|
||||||
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
|
|
||||||
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
|
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -86,6 +86,7 @@ import javax.annotation.PreDestroy;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.EnumMap;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -147,7 +148,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
private ScheduledExecutorService pingExecutor;
|
private ScheduledExecutorService pingExecutor;
|
||||||
private String serviceId;
|
private String serviceId;
|
||||||
|
|
||||||
private List<WsCmdHandler<? extends WsCmd>> cmdsHandlers;
|
private Map<WsCmdType, WsCmdHandler<? extends WsCmd>> cmdsHandlers;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
@ -157,24 +158,23 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
|
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
|
||||||
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
|
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
cmdsHandlers = List.of(
|
cmdsHandlers = new EnumMap<>(WsCmdType.class);
|
||||||
newCmdHandler(WsCmdType.ATTRIBUTES, this::handleWsAttributesSubscriptionCmd),
|
cmdsHandlers.put(WsCmdType.ATTRIBUTES, newCmdHandler(this::handleWsAttributesSubscriptionCmd));
|
||||||
newCmdHandler(WsCmdType.TIMESERIES, this::handleWsTimeseriesSubscriptionCmd),
|
cmdsHandlers.put(WsCmdType.TIMESERIES, newCmdHandler(this::handleWsTimeseriesSubscriptionCmd));
|
||||||
newCmdHandler(WsCmdType.TIMESERIES_HISTORY, this::handleWsHistoryCmd),
|
cmdsHandlers.put(WsCmdType.TIMESERIES_HISTORY, newCmdHandler(this::handleWsHistoryCmd));
|
||||||
newCmdHandler(WsCmdType.ENTITY_DATA, this::handleWsEntityDataCmd),
|
cmdsHandlers.put(WsCmdType.ENTITY_DATA, newCmdHandler(this::handleWsEntityDataCmd));
|
||||||
newCmdHandler(WsCmdType.ALARM_DATA, this::handleWsAlarmDataCmd),
|
cmdsHandlers.put(WsCmdType.ALARM_DATA, newCmdHandler(this::handleWsAlarmDataCmd));
|
||||||
newCmdHandler(WsCmdType.ENTITY_COUNT, this::handleWsEntityCountCmd),
|
cmdsHandlers.put(WsCmdType.ENTITY_COUNT, newCmdHandler(this::handleWsEntityCountCmd));
|
||||||
newCmdHandler(WsCmdType.ALARM_COUNT, this::handleWsAlarmCountCmd),
|
cmdsHandlers.put(WsCmdType.ALARM_COUNT, newCmdHandler(this::handleWsAlarmCountCmd));
|
||||||
newCmdHandler(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
cmdsHandlers.put(WsCmdType.ENTITY_DATA_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
|
||||||
newCmdHandler(WsCmdType.ALARM_DATA_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
cmdsHandlers.put(WsCmdType.ALARM_DATA_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
|
||||||
newCmdHandler(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
cmdsHandlers.put(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
|
||||||
newCmdHandler(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, this::handleWsDataUnsubscribeCmd),
|
cmdsHandlers.put(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
|
||||||
newCmdHandler(WsCmdType.NOTIFICATIONS, notificationCmdsHandler::handleUnreadNotificationsSubCmd),
|
cmdsHandlers.put(WsCmdType.NOTIFICATIONS, newCmdHandler(notificationCmdsHandler::handleUnreadNotificationsSubCmd));
|
||||||
newCmdHandler(WsCmdType.NOTIFICATIONS_COUNT, notificationCmdsHandler::handleUnreadNotificationsCountSubCmd),
|
cmdsHandlers.put(WsCmdType.NOTIFICATIONS_COUNT, newCmdHandler(notificationCmdsHandler::handleUnreadNotificationsCountSubCmd));
|
||||||
newCmdHandler(WsCmdType.MARK_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAsReadCmd),
|
cmdsHandlers.put(WsCmdType.MARK_NOTIFICATIONS_AS_READ, newCmdHandler(notificationCmdsHandler::handleMarkAsReadCmd));
|
||||||
newCmdHandler(WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ, notificationCmdsHandler::handleMarkAllAsReadCmd),
|
cmdsHandlers.put(WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ, newCmdHandler(notificationCmdsHandler::handleMarkAllAsReadCmd));
|
||||||
newCmdHandler(WsCmdType.NOTIFICATIONS_UNSUBSCRIBE, notificationCmdsHandler::handleUnsubCmd)
|
cmdsHandlers.put(WsCmdType.NOTIFICATIONS_UNSUBSCRIBE, newCmdHandler(notificationCmdsHandler::handleUnsubCmd));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
@ -221,7 +221,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
for (WsCmd cmd : commandsWrapper.getCmds()) {
|
for (WsCmd cmd : commandsWrapper.getCmds()) {
|
||||||
log.debug("[{}][{}][{}] Processing cmd: {}", sessionId, cmd.getType(), cmd.getCmdId(), cmd);
|
log.debug("[{}][{}][{}] Processing cmd: {}", sessionId, cmd.getType(), cmd.getCmdId(), cmd);
|
||||||
try {
|
try {
|
||||||
Optional.ofNullable(getCmdHandler(cmd.getType()))
|
Optional.ofNullable(cmdsHandlers.get(cmd.getType()))
|
||||||
.ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd));
|
.ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId,
|
log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId,
|
||||||
@ -963,24 +963,14 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
.map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
|
.map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public WsCmdHandler<? extends WsCmd> getCmdHandler(WsCmdType cmdType) {
|
public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(BiConsumer<WebSocketSessionRef, C> handler) {
|
||||||
for (WsCmdHandler<? extends WsCmd> cmdHandler : cmdsHandlers) {
|
return new WsCmdHandler<>(handler);
|
||||||
if (cmdHandler.getCmdType() == cmdType) {
|
|
||||||
return cmdHandler;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <C extends WsCmd> WsCmdHandler<C> newCmdHandler(WsCmdType cmdType, BiConsumer<WebSocketSessionRef, C> handler) {
|
|
||||||
return new WsCmdHandler<>(cmdType, handler);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@Getter
|
@Getter
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static class WsCmdHandler<C extends WsCmd> {
|
public static class WsCmdHandler<C extends WsCmd> {
|
||||||
private final WsCmdType cmdType;
|
|
||||||
protected final BiConsumer<WebSocketSessionRef, C> handler;
|
protected final BiConsumer<WebSocketSessionRef, C> handler;
|
||||||
|
|
||||||
public void handle(WebSocketSessionRef sessionRef, WsCmd cmd) {
|
public void handle(WebSocketSessionRef sessionRef, WsCmd cmd) {
|
||||||
|
|||||||
@ -54,11 +54,13 @@ public class WebSocketSessionRef {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "WebSocketSessionRef{" +
|
String info = "";
|
||||||
"sessionId='" + sessionId + '\'' +
|
if (securityCtx != null) {
|
||||||
", localAddress=" + localAddress +
|
info += "[" + securityCtx.getTenantId() + "]";
|
||||||
", remoteAddress=" + remoteAddress +
|
info += "[" + securityCtx.getId() + "]";
|
||||||
", sessionType=" + sessionType +
|
|
||||||
'}';
|
|
||||||
}
|
}
|
||||||
|
info += "[" + sessionId + "]";
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -76,6 +76,8 @@ server:
|
|||||||
max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"
|
max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"
|
||||||
# Maximum queue size of the websocket updates per session. This restriction prevents infinite updates of WS
|
# Maximum queue size of the websocket updates per session. This restriction prevents infinite updates of WS
|
||||||
max_queue_messages_per_session: "${TB_SERVER_WS_DEFAULT_QUEUE_MESSAGES_PER_SESSION:1000}"
|
max_queue_messages_per_session: "${TB_SERVER_WS_DEFAULT_QUEUE_MESSAGES_PER_SESSION:1000}"
|
||||||
|
# Maximum time between WS session opening and sending auth command
|
||||||
|
auth_timeout_ms: "${TB_SERVER_WS_AUTH_TIMEOUT_MS:10000}"
|
||||||
rest:
|
rest:
|
||||||
server_side_rpc:
|
server_side_rpc:
|
||||||
# Minimum value of the server-side RPC timeout. May override value provided in the REST API call.
|
# Minimum value of the server-side RPC timeout. May override value provided in the REST API call.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user