WS logging improvements

This commit is contained in:
ViacheslavKlimov 2023-12-11 15:54:59 +02:00
parent 6d255ebcfa
commit 131f6bf721
2 changed files with 31 additions and 34 deletions

View File

@ -142,8 +142,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!")); session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!"));
return; return;
} }
String msg = message.getPayload(); sessionMd.onMsg(message.getPayload());
sessionMd.onMsg(msg);
} catch (IOException e) { } catch (IOException e) {
log.warn("IO error", e); log.warn("IO error", e);
} }
@ -167,7 +166,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
return; return;
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); log.debug("{} Failed to decode subscription cmd: {}", sessionRef.toString(), e.getMessage(), e);
if (sessionRef.getSecurityCtx() != null) { if (sessionRef.getSecurityCtx() != null) {
webSocketService.sendError(sessionRef, 1, SubscriptionErrorCode.BAD_REQUEST, "Failed to parse the payload"); webSocketService.sendError(sessionRef, 1, SubscriptionErrorCode.BAD_REQUEST, "Failed to parse the payload");
} else { } else {
@ -177,7 +176,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
if (sessionRef.getSecurityCtx() != null) { if (sessionRef.getSecurityCtx() != null) {
log.trace("[{}][{}] Processing {}", sessionRef.getSecurityCtx().getTenantId(), sessionMd.session.getId(), msg); log.trace("{} Processing {}", sessionRef.toString(), msg);
webSocketService.handleCommands(sessionRef, cmdsWrapper); webSocketService.handleCommands(sessionRef, cmdsWrapper);
} else { } else {
AuthCmd authCmd = cmdsWrapper.getAuthCmd(); AuthCmd authCmd = cmdsWrapper.getAuthCmd();
@ -185,7 +184,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Auth cmd is missing")); close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Auth cmd is missing"));
return; return;
} }
log.trace("[{}] Authenticating session", sessionMd.session.getId()); log.trace("{} Authenticating session", sessionRef.toString());
SecurityUser securityCtx; SecurityUser securityCtx;
try { try {
securityCtx = authenticationProvider.authenticate(authCmd.getToken()); securityCtx = authenticationProvider.authenticate(authCmd.getToken());
@ -205,7 +204,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
try { try {
SessionMetaData sessionMd = getSessionMd(session.getId()); SessionMetaData sessionMd = getSessionMd(session.getId());
if (sessionMd != null) { if (sessionMd != null) {
log.trace("[{}][{}] Processing pong response {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message.getPayload()); log.trace("{} Processing pong response {}", sessionMd.sessionRef.toString(), message.getPayload());
sessionMd.processPongMessage(System.currentTimeMillis()); sessionMd.processPongMessage(System.currentTimeMillis());
} else { } else {
log.trace("[{}] Failed to find session", session.getId()); log.trace("[{}] Failed to find session", session.getId());
@ -255,7 +254,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
internalSessionMap.put(session.getId(), sessionMd); internalSessionMap.put(session.getId(), sessionMd);
externalSessionMap.put(sessionRef.getSessionId(), session.getId()); externalSessionMap.put(sessionRef.getSessionId(), session.getId());
processInWebSocketService(sessionRef, SessionEvent.onEstablished()); processInWebSocketService(sessionRef, SessionEvent.onEstablished());
log.info("[{}][{}][{}] Session established from address: {}", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSessionId(), session.getId(), session.getRemoteAddress()); log.info("[{}][{}][{}][{}] Session established from address: {}", sessionRef.getSecurityCtx().getTenantId(),
sessionRef.getSecurityCtx().getId(), sessionRef.getSessionId(), session.getId(), session.getRemoteAddress());
} else { } else {
sessionMd = new SessionMetaData(session, sessionRef); sessionMd = new SessionMetaData(session, sessionRef);
pendingSessions.put(session.getId(), sessionMd); pendingSessions.put(session.getId(), sessionMd);
@ -288,7 +288,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
cleanupLimits(session, sessionMd.sessionRef); cleanupLimits(session, sessionMd.sessionRef);
processInWebSocketService(sessionMd.sessionRef, SessionEvent.onClosed()); processInWebSocketService(sessionMd.sessionRef, SessionEvent.onClosed());
} }
log.info("[{}][{}][{}] Session is closed", sessionMd.sessionRef.getSecurityCtx().getTenantId(), sessionMd.sessionRef.getSessionId(), session.getId()); log.info("{} Session is closed", sessionMd.sessionRef.toString());
} else { } else {
log.info("[{}] Session is closed", session.getId()); log.info("[{}] Session is closed", session.getId());
} }
@ -301,7 +301,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
try { try {
webSocketService.handleSessionEvent(sessionRef, event); webSocketService.handleSessionEvent(sessionRef, event);
} catch (BeanCreationNotAllowedException e) { } catch (BeanCreationNotAllowedException e) {
log.warn("[{}] Failed to close session due to possible shutdown state", sessionRef.getSessionId()); log.warn("{} Failed to close session due to possible shutdown state", sessionRef.toString());
} }
} }
@ -367,13 +367,13 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
try { try {
long timeSinceLastActivity = currentTime - lastActivityTime; long timeSinceLastActivity = currentTime - lastActivityTime;
if (timeSinceLastActivity >= pingTimeout) { if (timeSinceLastActivity >= pingTimeout) {
log.warn("[{}] Closing session due to ping timeout", session.getId()); log.warn("{} Closing session due to ping timeout", sessionRef.toString());
closeSession(CloseStatus.SESSION_NOT_RELIABLE); closeSession(CloseStatus.SESSION_NOT_RELIABLE);
} else if (timeSinceLastActivity >= pingTimeout / NUMBER_OF_PING_ATTEMPTS) { } else if (timeSinceLastActivity >= pingTimeout / NUMBER_OF_PING_ATTEMPTS) {
sendMsg(TbWebSocketPingMsg.INSTANCE); sendMsg(TbWebSocketPingMsg.INSTANCE);
} }
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}] Failed to send ping msg", session.getId(), e); log.trace("{} Failed to send ping msg", sessionRef.toString(), e);
closeSession(CloseStatus.SESSION_NOT_RELIABLE); closeSession(CloseStatus.SESSION_NOT_RELIABLE);
} }
} }
@ -382,7 +382,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
try { try {
close(this.sessionRef, reason); close(this.sessionRef, reason);
} catch (IOException ioe) { } catch (IOException ioe) {
log.trace("[{}] Session transport error", session.getId(), ioe); log.trace("{} Session transport error", sessionRef.toString(), ioe);
} finally { } finally {
outboundMsgQueue.clear(); outboundMsgQueue.clear();
} }
@ -402,7 +402,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
outboundMsgQueueSize.incrementAndGet(); outboundMsgQueueSize.incrementAndGet();
processNextMsg(); processNextMsg();
} else { } else {
log.info("[{}][{}] Session closed due to updates queue size exceeded", sessionRef.getSecurityCtx().getTenantId(), session.getId()); log.info("{} Session closed due to updates queue size exceeded", sessionRef.toString());
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!")); closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
} }
} }
@ -420,7 +420,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
processNextMsg(); processNextMsg();
} }
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}] Failed to send msg", session.getId(), e); log.trace("{} Failed to send msg", sessionRef.toString(), e);
closeSession(CloseStatus.SESSION_NOT_RELIABLE); closeSession(CloseStatus.SESSION_NOT_RELIABLE);
} }
} }
@ -428,7 +428,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
@Override @Override
public void onResult(SendResult result) { public void onResult(SendResult result) {
if (!result.isOK()) { if (!result.isOK()) {
log.trace("[{}] Failed to send msg", session.getId(), result.getException()); log.trace("{} Failed to send msg", sessionRef.toString(), result.getException());
closeSession(CloseStatus.SESSION_NOT_RELIABLE); closeSession(CloseStatus.SESSION_NOT_RELIABLE);
return; return;
} }
@ -475,8 +475,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
@Override @Override
public void send(WebSocketSessionRef sessionRef, int subscriptionId, String msg) throws IOException { public void send(WebSocketSessionRef sessionRef, int subscriptionId, String msg) throws IOException {
log.debug("{} Sending {}", sessionRef.toString(), msg);
String externalId = sessionRef.getSessionId(); String externalId = sessionRef.getSessionId();
log.debug("[{}] Sending {}", externalId, msg);
String internalId = externalSessionMap.get(externalId); String internalId = externalSessionMap.get(externalId);
if (internalId != null) { if (internalId != null) {
SessionMetaData sessionMd = internalSessionMap.get(internalId); SessionMetaData sessionMd = internalSessionMap.get(internalId);
@ -484,13 +484,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId(); TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
if (!rateLimitService.checkRateLimit(LimitedApi.WS_UPDATES_PER_SESSION, tenantId, (Object) sessionRef.getSessionId())) { if (!rateLimitService.checkRateLimit(LimitedApi.WS_UPDATES_PER_SESSION, tenantId, (Object) sessionRef.getSessionId())) {
if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) { if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached" log.info("{} Failed to process session update. Max session updates limit reached", sessionRef.toString());
, tenantId, sessionRef.getSecurityCtx().getId(), externalId);
sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}"); sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}");
} }
return; return;
} else { } else {
log.debug("[{}][{}][{}] Session is no longer blacklisted.", tenantId, sessionRef.getSecurityCtx().getId(), externalId); log.debug("{} Session is no longer blacklisted.", sessionRef.toString());
blacklistedSessions.remove(externalId); blacklistedSessions.remove(externalId);
} }
sessionMd.sendMsg(msg); sessionMd.sendMsg(msg);
@ -521,7 +520,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
@Override @Override
public void close(WebSocketSessionRef sessionRef, CloseStatus reason) throws IOException { public void close(WebSocketSessionRef sessionRef, CloseStatus reason) throws IOException {
String externalId = sessionRef.getSessionId(); String externalId = sessionRef.getSessionId();
log.debug("[{}] Processing close request", externalId); log.debug("{} Processing close request", sessionRef.toString());
String internalId = externalSessionMap.get(externalId); String internalId = externalSessionMap.get(externalId);
if (internalId != null) { if (internalId != null) {
SessionMetaData sessionMd = getSessionMd(internalId); SessionMetaData sessionMd = getSessionMd(internalId);
@ -551,8 +550,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
} }
if (!limitAllowed) { if (!limitAllowed) {
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached" log.info("{} Failed to start session. Max tenant sessions limit reached", sessionRef.toString());
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!")); session.close(CloseStatus.POLICY_VIOLATION.withReason("Max tenant sessions limit reached!"));
return false; return false;
} }
@ -568,8 +566,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
} }
if (!limitAllowed) { if (!limitAllowed) {
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached" log.info("{} Failed to start session. Max customer sessions limit reached", sessionRef.toString());
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached")); session.close(CloseStatus.POLICY_VIOLATION.withReason("Max customer sessions limit reached"));
return false; return false;
} }
@ -584,8 +581,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
} }
if (!limitAllowed) { if (!limitAllowed) {
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached" log.info("{} Failed to start session. Max regular user sessions limit reached", sessionRef.toString());
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached")); session.close(CloseStatus.POLICY_VIOLATION.withReason("Max regular user sessions limit reached"));
return false; return false;
} }
@ -600,8 +596,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
} }
if (!limitAllowed) { if (!limitAllowed) {
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached" log.info("{} Failed to start session. Max public user sessions limit reached", sessionRef.toString());
, sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), sessionId);
session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached")); session.close(CloseStatus.POLICY_VIOLATION.withReason("Max public user sessions limit reached"));
return false; return false;
} }

View File

@ -54,11 +54,13 @@ public class WebSocketSessionRef {
@Override @Override
public String toString() { public String toString() {
return "WebSocketSessionRef{" + String info = "";
"sessionId='" + sessionId + '\'' + if (securityCtx != null) {
", localAddress=" + localAddress + info += "[" + securityCtx.getTenantId() + "]";
", remoteAddress=" + remoteAddress + info += "[" + securityCtx.getId() + "]";
", sessionType=" + sessionType + }
'}'; info += "[" + sessionId + "]";
return info;
} }
} }