Fixed dynamic query count statistics. Updated default ping timeout to 30 seconds

This commit is contained in:
Volodymyr Babak 2021-12-29 13:32:00 +02:00
parent 7570ab5703
commit 791b0594fd
11 changed files with 30 additions and 8 deletions

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.config.WebSocketConfiguration;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.model.UserPrincipal; import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.SessionEvent; import org.thingsboard.server.service.telemetry.SessionEvent;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint; import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
@ -57,6 +58,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import static org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService.NUMBER_OF_PING_ATTEMPTS;
@Service @Service
@TbCoreComponent @TbCoreComponent
@Slf4j @Slf4j
@ -229,10 +232,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
synchronized void sendPing(long currentTime) { synchronized void sendPing(long currentTime) {
try { try {
if (currentTime - lastActivityTime >= pingTimeout) { long timeSinceLastActivity = currentTime - lastActivityTime;
if (timeSinceLastActivity >= pingTimeout) {
log.warn("[{}] Closing session due to ping timeout", session.getId()); log.warn("[{}] Closing session due to ping timeout", session.getId());
closeSession(CloseStatus.SESSION_NOT_RELIABLE); closeSession(CloseStatus.SESSION_NOT_RELIABLE);
} else { } else if (timeSinceLastActivity >= pingTimeout / NUMBER_OF_PING_ATTEMPTS) {
this.asyncRemote.sendPing(PING_MSG); this.asyncRemote.sendPing(PING_MSG);
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -289,6 +289,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
finalCtx.update(); finalCtx.update();
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
log.trace("[{}][{}] Executing query: {}", finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery());
stats.getDynamicQueryInvocationCnt().incrementAndGet(); stats.getDynamicQueryInvocationCnt().incrementAndGet();
stats.getDynamicQueryTimeSpent().addAndGet(end - start); stats.getDynamicQueryTimeSpent().addAndGet(end - start);
} catch (Exception e) { } catch (Exception e) {
@ -304,7 +305,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
long regularQueryInvocationTimeValue = stats.getRegularQueryTimeSpent().getAndSet(0); long regularQueryInvocationTimeValue = stats.getRegularQueryTimeSpent().getAndSet(0);
int dynamicQueryInvocationCntValue = stats.getDynamicQueryInvocationCnt().getAndSet(0); int dynamicQueryInvocationCntValue = stats.getDynamicQueryInvocationCnt().getAndSet(0);
long dynamicQueryInvocationTimeValue = stats.getDynamicQueryTimeSpent().getAndSet(0); long dynamicQueryInvocationTimeValue = stats.getDynamicQueryTimeSpent().getAndSet(0);
long dynamicQueryCnt = subscriptionsBySessionId.values().stream().map(Map::values).count(); long dynamicQueryCnt = subscriptionsBySessionId.values().stream().mapToLong(m -> m.values().stream().filter(TbAbstractSubCtx::isDynamic).count()).sum();
if (regularQueryInvocationCntValue > 0 || dynamicQueryInvocationCntValue > 0 || dynamicQueryCnt > 0 || alarmQueryInvocationCntValue > 0) { if (regularQueryInvocationCntValue > 0 || dynamicQueryInvocationCntValue > 0 || dynamicQueryCnt > 0 || alarmQueryInvocationCntValue > 0) {
log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], " + log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], " +
"dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}], " + "dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}], " +

View File

@ -72,6 +72,11 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
return result; return result;
} }
@Override
public boolean isDynamic() {
return query.getPageLink().isDynamic();
}
@Override @Override
protected synchronized void update() { protected synchronized void update() {
PageData<EntityData> newData = findEntityData(); PageData<EntityData> newData = findEntityData();

View File

@ -174,6 +174,8 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
} }
} }
public abstract boolean isDynamic();
public abstract void fetchData(); public abstract void fetchData();
protected abstract void update(); protected abstract void update();

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus; import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
@ -56,6 +57,7 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@ToString(callSuper = true)
public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
private final AlarmService alarmService; private final AlarmService alarmService;

View File

@ -52,4 +52,8 @@ public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> {
} }
} }
@Override
public boolean isDynamic() {
return true;
}
} }

View File

@ -103,6 +103,8 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService { public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
public static final int NUMBER_OF_PING_ATTEMPTS = 3;
private static final int DEFAULT_LIMIT = 100; private static final int DEFAULT_LIMIT = 100;
private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE; private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
private static final int UNKNOWN_SUBSCRIPTION_ID = 0; private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
@ -136,7 +138,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
@Autowired @Autowired
private TbServiceInfoProvider serviceInfoProvider; private TbServiceInfoProvider serviceInfoProvider;
@Value("${server.ws.limits.max_subscriptions_per_tenant:0}") @Value("${server.ws.limits.max_subscriptions_per_tenant:0}")
private int maxSubscriptionsPerTenant; private int maxSubscriptionsPerTenant;
@Value("${server.ws.limits.max_subscriptions_per_customer:0}") @Value("${server.ws.limits.max_subscriptions_per_customer:0}")
@ -165,7 +166,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass()); executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass());
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping")); pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout/3, pingTimeout/3, TimeUnit.MILLISECONDS); pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
} }
@PreDestroy @PreDestroy

View File

@ -54,7 +54,8 @@ server:
log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:false}" log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:false}"
ws: ws:
send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}" send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:15000}" # recommended timeout >= 30 seconds. Platform will attempt to send 'ping' request 3 times within the timeout
ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}"
limits: limits:
# Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}" max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"

View File

@ -20,7 +20,7 @@ import lombok.ToString;
import java.util.List; import java.util.List;
@ToString @ToString(callSuper = true)
public abstract class AbstractDataQuery<T extends EntityDataPageLink> extends EntityCountQuery { public abstract class AbstractDataQuery<T extends EntityDataPageLink> extends EntityCountQuery {
@Getter @Getter

View File

@ -17,11 +17,13 @@ package org.thingsboard.server.common.data.query;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import lombok.Getter; import lombok.Getter;
import lombok.ToString;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ApiModel @ApiModel
@ToString
public class EntityCountQuery { public class EntityCountQuery {
@Getter @Getter

View File

@ -21,7 +21,7 @@ import lombok.ToString;
import java.util.List; import java.util.List;
@ToString @ToString(callSuper = true)
public class EntityDataQuery extends AbstractDataQuery<EntityDataPageLink> { public class EntityDataQuery extends AbstractDataQuery<EntityDataPageLink> {
public EntityDataQuery() { public EntityDataQuery() {