diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 0e59030fe2..9fdda2215d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -23,6 +23,7 @@ import org.springframework.security.core.Authentication; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.adapter.NativeWebSocketSession; @@ -36,6 +37,7 @@ import org.thingsboard.server.config.WebSocketConfiguration; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.UserPrincipal; +import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService; import org.thingsboard.server.service.telemetry.SessionEvent; import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; @@ -56,6 +58,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import static org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService.NUMBER_OF_PING_ATTEMPTS; + @Service @TbCoreComponent @Slf4j @@ -111,6 +115,22 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } } + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { + try { + SessionMetaData sessionMd = internalSessionMap.get(session.getId()); + if (sessionMd != null) { + log.trace("[{}][{}] Processing pong response {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message.getPayload()); + sessionMd.processPongMessage(System.currentTimeMillis()); + } else { + log.trace("[{}] Failed to find session", session.getId()); + session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!")); + } + } catch (IOException e) { + log.warn("IO error", e); + } + } + @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { super.afterConnectionEstablished(session); @@ -212,20 +232,31 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr synchronized void sendPing(long currentTime) { try { - if (currentTime - lastActivityTime >= pingTimeout) { + long timeSinceLastActivity = currentTime - lastActivityTime; + if (timeSinceLastActivity >= pingTimeout) { + log.warn("[{}] Closing session due to ping timeout", session.getId()); + closeSession(CloseStatus.SESSION_NOT_RELIABLE); + } else if (timeSinceLastActivity >= pingTimeout / NUMBER_OF_PING_ATTEMPTS) { this.asyncRemote.sendPing(PING_MSG); - lastActivityTime = currentTime; } } catch (Exception e) { log.trace("[{}] Failed to send ping msg", session.getId(), e); - try { - close(this.sessionRef, CloseStatus.SESSION_NOT_RELIABLE); - } catch (IOException ioe) { - log.trace("[{}] Session transport error", session.getId(), ioe); - } + closeSession(CloseStatus.SESSION_NOT_RELIABLE); } } + private void closeSession(CloseStatus reason) { + try { + close(this.sessionRef, reason); + } catch (IOException ioe) { + log.trace("[{}] Session transport error", session.getId(), ioe); + } + } + + synchronized void processPongMessage(long currentTime) { + lastActivityTime = currentTime; + } + synchronized void sendMsg(String msg) { if (isSending) { try { @@ -236,11 +267,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } else { log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId()); } - try { - close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!")); - } catch (IOException ioe) { - log.trace("[{}] Session transport error", session.getId(), ioe); - } + closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!")); } } else { isSending = true; @@ -253,11 +280,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr this.asyncRemote.sendText(msg, this); } catch (Exception e) { log.trace("[{}] Failed to send msg", session.getId(), e); - try { - close(this.sessionRef, CloseStatus.SESSION_NOT_RELIABLE); - } catch (IOException ioe) { - log.trace("[{}] Session transport error", session.getId(), ioe); - } + closeSession(CloseStatus.SESSION_NOT_RELIABLE); } } @@ -265,11 +288,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr public void onResult(SendResult result) { if (!result.isOK()) { log.trace("[{}] Failed to send msg", session.getId(), result.getException()); - try { - close(this.sessionRef, CloseStatus.SESSION_NOT_RELIABLE); - } catch (IOException ioe) { - log.trace("[{}] Session transport error", session.getId(), ioe); - } + closeSession(CloseStatus.SESSION_NOT_RELIABLE); } else { lastActivityTime = System.currentTimeMillis(); String msg = msgQueue.poll(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 4ee8708805..c0f882bfe8 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -289,6 +289,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc long start = System.currentTimeMillis(); finalCtx.update(); long end = System.currentTimeMillis(); + log.trace("[{}][{}] Executing query: {}", finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery()); stats.getDynamicQueryInvocationCnt().incrementAndGet(); stats.getDynamicQueryTimeSpent().addAndGet(end - start); } catch (Exception e) { @@ -304,7 +305,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc long regularQueryInvocationTimeValue = stats.getRegularQueryTimeSpent().getAndSet(0); int dynamicQueryInvocationCntValue = stats.getDynamicQueryInvocationCnt().getAndSet(0); long dynamicQueryInvocationTimeValue = stats.getDynamicQueryTimeSpent().getAndSet(0); - long dynamicQueryCnt = subscriptionsBySessionId.values().stream().map(Map::values).count(); + long dynamicQueryCnt = subscriptionsBySessionId.values().stream().mapToLong(m -> m.values().stream().filter(TbAbstractSubCtx::isDynamic).count()).sum(); if (regularQueryInvocationCntValue > 0 || dynamicQueryInvocationCntValue > 0 || dynamicQueryCnt > 0 || alarmQueryInvocationCntValue > 0) { log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], " + "dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}], " + diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index d7fac40ad5..136c625615 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -72,6 +72,11 @@ public abstract class TbAbstractDataSubCtx newData = findEntityData(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java index f9b75ba217..05f0612717 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java @@ -174,6 +174,8 @@ public abstract class TbAbstractSubCtx { } } + public abstract boolean isDynamic(); + public abstract void fetchData(); protected abstract void update(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index 1ea6495517..99f7ac561b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription; import lombok.Getter; import lombok.Setter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmSearchStatus; @@ -56,6 +57,7 @@ import java.util.function.Function; import java.util.stream.Collectors; @Slf4j +@ToString(callSuper = true) public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { private final AlarmService alarmService; diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java index e97892a37d..a9324d3ecf 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java @@ -52,4 +52,8 @@ public class TbEntityCountSubCtx extends TbAbstractSubCtx { } } + @Override + public boolean isDynamic() { + return true; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 73eed820ab..dfc2185807 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -103,6 +103,8 @@ import java.util.stream.Collectors; @Slf4j public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService { + public static final int NUMBER_OF_PING_ATTEMPTS = 3; + private static final int DEFAULT_LIMIT = 100; private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE; private static final int UNKNOWN_SUBSCRIPTION_ID = 0; @@ -136,7 +138,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @Autowired private TbServiceInfoProvider serviceInfoProvider; - @Value("${server.ws.limits.max_subscriptions_per_tenant:0}") private int maxSubscriptionsPerTenant; @Value("${server.ws.limits.max_subscriptions_per_customer:0}") @@ -146,6 +147,9 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @Value("${server.ws.limits.max_subscriptions_per_public_user:0}") private int maxSubscriptionsPerPublicUser; + @Value("${server.ws.ping_timeout:30000}") + private long pingTimeout; + private ConcurrentMap> tenantSubscriptionsMap = new ConcurrentHashMap<>(); private ConcurrentMap> customerSubscriptionsMap = new ConcurrentHashMap<>(); private ConcurrentMap> regularUserSubscriptionsMap = new ConcurrentHashMap<>(); @@ -162,7 +166,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass()); pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping")); - pingExecutor.scheduleWithFixedDelay(this::sendPing, 10000, 10000, TimeUnit.MILLISECONDS); + pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); } @PreDestroy diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c3ce2e3805..124462cd5e 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -54,7 +54,8 @@ server: log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:false}" ws: send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}" - ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:15000}" + # recommended timeout >= 30 seconds. Platform will attempt to send 'ping' request 3 times within the timeout + ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}" limits: # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}" diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/AbstractDataQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/AbstractDataQuery.java index 0e807f8c37..adf16effb9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/AbstractDataQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/AbstractDataQuery.java @@ -20,7 +20,7 @@ import lombok.ToString; import java.util.List; -@ToString +@ToString(callSuper = true) public abstract class AbstractDataQuery extends EntityCountQuery { @Getter diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java index a66a6c0954..13434b358a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java @@ -17,11 +17,13 @@ package org.thingsboard.server.common.data.query; import io.swagger.annotations.ApiModel; import lombok.Getter; +import lombok.ToString; import java.util.Collections; import java.util.List; @ApiModel +@ToString public class EntityCountQuery { @Getter diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java index aca8437fe8..07b90a9327 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java @@ -21,7 +21,7 @@ import lombok.ToString; import java.util.List; -@ToString +@ToString(callSuper = true) public class EntityDataQuery extends AbstractDataQuery { public EntityDataQuery() {