Merge pull request #5793 from volodymyr-babak/ping-pong
Added functionality to handle pong responses on web sockets
This commit is contained in:
commit
1d53b326c8
@ -23,6 +23,7 @@ import org.springframework.security.core.Authentication;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
import org.springframework.web.socket.CloseStatus;
|
import org.springframework.web.socket.CloseStatus;
|
||||||
|
import org.springframework.web.socket.PongMessage;
|
||||||
import org.springframework.web.socket.TextMessage;
|
import org.springframework.web.socket.TextMessage;
|
||||||
import org.springframework.web.socket.WebSocketSession;
|
import org.springframework.web.socket.WebSocketSession;
|
||||||
import org.springframework.web.socket.adapter.NativeWebSocketSession;
|
import org.springframework.web.socket.adapter.NativeWebSocketSession;
|
||||||
@ -36,6 +37,7 @@ import org.thingsboard.server.config.WebSocketConfiguration;
|
|||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||||
import org.thingsboard.server.service.security.model.UserPrincipal;
|
import org.thingsboard.server.service.security.model.UserPrincipal;
|
||||||
|
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
|
||||||
import org.thingsboard.server.service.telemetry.SessionEvent;
|
import org.thingsboard.server.service.telemetry.SessionEvent;
|
||||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
|
import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
|
||||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
||||||
@ -56,6 +58,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import static org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService.NUMBER_OF_PING_ATTEMPTS;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@TbCoreComponent
|
@TbCoreComponent
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -111,6 +115,22 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
|
||||||
|
try {
|
||||||
|
SessionMetaData sessionMd = internalSessionMap.get(session.getId());
|
||||||
|
if (sessionMd != null) {
|
||||||
|
log.trace("[{}][{}] Processing pong response {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message.getPayload());
|
||||||
|
sessionMd.processPongMessage(System.currentTimeMillis());
|
||||||
|
} else {
|
||||||
|
log.trace("[{}] Failed to find session", session.getId());
|
||||||
|
session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!"));
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.warn("IO error", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||||
super.afterConnectionEstablished(session);
|
super.afterConnectionEstablished(session);
|
||||||
@ -212,18 +232,29 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
|
|
||||||
synchronized void sendPing(long currentTime) {
|
synchronized void sendPing(long currentTime) {
|
||||||
try {
|
try {
|
||||||
if (currentTime - lastActivityTime >= pingTimeout) {
|
long timeSinceLastActivity = currentTime - lastActivityTime;
|
||||||
|
if (timeSinceLastActivity >= pingTimeout) {
|
||||||
|
log.warn("[{}] Closing session due to ping timeout", session.getId());
|
||||||
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
|
} else if (timeSinceLastActivity >= pingTimeout / NUMBER_OF_PING_ATTEMPTS) {
|
||||||
this.asyncRemote.sendPing(PING_MSG);
|
this.asyncRemote.sendPing(PING_MSG);
|
||||||
lastActivityTime = currentTime;
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.trace("[{}] Failed to send ping msg", session.getId(), e);
|
log.trace("[{}] Failed to send ping msg", session.getId(), e);
|
||||||
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeSession(CloseStatus reason) {
|
||||||
try {
|
try {
|
||||||
close(this.sessionRef, CloseStatus.SESSION_NOT_RELIABLE);
|
close(this.sessionRef, reason);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
log.trace("[{}] Session transport error", session.getId(), ioe);
|
log.trace("[{}] Session transport error", session.getId(), ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized void processPongMessage(long currentTime) {
|
||||||
|
lastActivityTime = currentTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void sendMsg(String msg) {
|
synchronized void sendMsg(String msg) {
|
||||||
@ -236,11 +267,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
} else {
|
} else {
|
||||||
log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId());
|
log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId());
|
||||||
}
|
}
|
||||||
try {
|
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
|
||||||
close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
log.trace("[{}] Session transport error", session.getId(), ioe);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
isSending = true;
|
isSending = true;
|
||||||
@ -253,11 +280,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
this.asyncRemote.sendText(msg, this);
|
this.asyncRemote.sendText(msg, this);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.trace("[{}] Failed to send msg", session.getId(), e);
|
log.trace("[{}] Failed to send msg", session.getId(), e);
|
||||||
try {
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
close(this.sessionRef, CloseStatus.SESSION_NOT_RELIABLE);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
log.trace("[{}] Session transport error", session.getId(), ioe);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -265,11 +288,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
public void onResult(SendResult result) {
|
public void onResult(SendResult result) {
|
||||||
if (!result.isOK()) {
|
if (!result.isOK()) {
|
||||||
log.trace("[{}] Failed to send msg", session.getId(), result.getException());
|
log.trace("[{}] Failed to send msg", session.getId(), result.getException());
|
||||||
try {
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE);
|
||||||
close(this.sessionRef, CloseStatus.SESSION_NOT_RELIABLE);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
log.trace("[{}] Session transport error", session.getId(), ioe);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
lastActivityTime = System.currentTimeMillis();
|
lastActivityTime = System.currentTimeMillis();
|
||||||
String msg = msgQueue.poll();
|
String msg = msgQueue.poll();
|
||||||
|
|||||||
@ -289,6 +289,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
|||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
finalCtx.update();
|
finalCtx.update();
|
||||||
long end = System.currentTimeMillis();
|
long end = System.currentTimeMillis();
|
||||||
|
log.trace("[{}][{}] Executing query: {}", finalCtx.getSessionId(), finalCtx.getCmdId(), finalCtx.getQuery());
|
||||||
stats.getDynamicQueryInvocationCnt().incrementAndGet();
|
stats.getDynamicQueryInvocationCnt().incrementAndGet();
|
||||||
stats.getDynamicQueryTimeSpent().addAndGet(end - start);
|
stats.getDynamicQueryTimeSpent().addAndGet(end - start);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -304,7 +305,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
|||||||
long regularQueryInvocationTimeValue = stats.getRegularQueryTimeSpent().getAndSet(0);
|
long regularQueryInvocationTimeValue = stats.getRegularQueryTimeSpent().getAndSet(0);
|
||||||
int dynamicQueryInvocationCntValue = stats.getDynamicQueryInvocationCnt().getAndSet(0);
|
int dynamicQueryInvocationCntValue = stats.getDynamicQueryInvocationCnt().getAndSet(0);
|
||||||
long dynamicQueryInvocationTimeValue = stats.getDynamicQueryTimeSpent().getAndSet(0);
|
long dynamicQueryInvocationTimeValue = stats.getDynamicQueryTimeSpent().getAndSet(0);
|
||||||
long dynamicQueryCnt = subscriptionsBySessionId.values().stream().map(Map::values).count();
|
long dynamicQueryCnt = subscriptionsBySessionId.values().stream().mapToLong(m -> m.values().stream().filter(TbAbstractSubCtx::isDynamic).count()).sum();
|
||||||
if (regularQueryInvocationCntValue > 0 || dynamicQueryInvocationCntValue > 0 || dynamicQueryCnt > 0 || alarmQueryInvocationCntValue > 0) {
|
if (regularQueryInvocationCntValue > 0 || dynamicQueryInvocationCntValue > 0 || dynamicQueryCnt > 0 || alarmQueryInvocationCntValue > 0) {
|
||||||
log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], " +
|
log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], " +
|
||||||
"dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}], " +
|
"dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}], " +
|
||||||
|
|||||||
@ -72,6 +72,11 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDynamic() {
|
||||||
|
return query.getPageLink().isDynamic();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void update() {
|
protected synchronized void update() {
|
||||||
PageData<EntityData> newData = findEntityData();
|
PageData<EntityData> newData = findEntityData();
|
||||||
|
|||||||
@ -174,6 +174,8 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public abstract boolean isDynamic();
|
||||||
|
|
||||||
public abstract void fetchData();
|
public abstract void fetchData();
|
||||||
|
|
||||||
protected abstract void update();
|
protected abstract void update();
|
||||||
|
|||||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription;
|
|||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
|
import lombok.ToString;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
|
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
|
||||||
@ -56,6 +57,7 @@ import java.util.function.Function;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@ToString(callSuper = true)
|
||||||
public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
|
public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
|
||||||
|
|
||||||
private final AlarmService alarmService;
|
private final AlarmService alarmService;
|
||||||
|
|||||||
@ -52,4 +52,8 @@ public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDynamic() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -103,6 +103,8 @@ import java.util.stream.Collectors;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
|
public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
|
||||||
|
|
||||||
|
public static final int NUMBER_OF_PING_ATTEMPTS = 3;
|
||||||
|
|
||||||
private static final int DEFAULT_LIMIT = 100;
|
private static final int DEFAULT_LIMIT = 100;
|
||||||
private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
|
private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
|
||||||
private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
|
private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
|
||||||
@ -136,7 +138,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TbServiceInfoProvider serviceInfoProvider;
|
private TbServiceInfoProvider serviceInfoProvider;
|
||||||
|
|
||||||
|
|
||||||
@Value("${server.ws.limits.max_subscriptions_per_tenant:0}")
|
@Value("${server.ws.limits.max_subscriptions_per_tenant:0}")
|
||||||
private int maxSubscriptionsPerTenant;
|
private int maxSubscriptionsPerTenant;
|
||||||
@Value("${server.ws.limits.max_subscriptions_per_customer:0}")
|
@Value("${server.ws.limits.max_subscriptions_per_customer:0}")
|
||||||
@ -146,6 +147,9 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
|||||||
@Value("${server.ws.limits.max_subscriptions_per_public_user:0}")
|
@Value("${server.ws.limits.max_subscriptions_per_public_user:0}")
|
||||||
private int maxSubscriptionsPerPublicUser;
|
private int maxSubscriptionsPerPublicUser;
|
||||||
|
|
||||||
|
@Value("${server.ws.ping_timeout:30000}")
|
||||||
|
private long pingTimeout;
|
||||||
|
|
||||||
private ConcurrentMap<TenantId, Set<String>> tenantSubscriptionsMap = new ConcurrentHashMap<>();
|
private ConcurrentMap<TenantId, Set<String>> tenantSubscriptionsMap = new ConcurrentHashMap<>();
|
||||||
private ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap<>();
|
private ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap<>();
|
||||||
private ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap<>();
|
private ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap<>();
|
||||||
@ -162,7 +166,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
|||||||
executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass());
|
executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass());
|
||||||
|
|
||||||
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
|
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
|
||||||
pingExecutor.scheduleWithFixedDelay(this::sendPing, 10000, 10000, TimeUnit.MILLISECONDS);
|
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
|
|||||||
@ -54,7 +54,8 @@ server:
|
|||||||
log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:false}"
|
log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:false}"
|
||||||
ws:
|
ws:
|
||||||
send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
|
send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
|
||||||
ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:15000}"
|
# recommended timeout >= 30 seconds. Platform will attempt to send 'ping' request 3 times within the timeout
|
||||||
|
ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}"
|
||||||
limits:
|
limits:
|
||||||
# Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
|
# Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
|
||||||
max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
|
max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
|
||||||
|
|||||||
@ -20,7 +20,7 @@ import lombok.ToString;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ToString
|
@ToString(callSuper = true)
|
||||||
public abstract class AbstractDataQuery<T extends EntityDataPageLink> extends EntityCountQuery {
|
public abstract class AbstractDataQuery<T extends EntityDataPageLink> extends EntityCountQuery {
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
|
|||||||
@ -17,11 +17,13 @@ package org.thingsboard.server.common.data.query;
|
|||||||
|
|
||||||
import io.swagger.annotations.ApiModel;
|
import io.swagger.annotations.ApiModel;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.ToString;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ApiModel
|
@ApiModel
|
||||||
|
@ToString
|
||||||
public class EntityCountQuery {
|
public class EntityCountQuery {
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
|
|||||||
@ -21,7 +21,7 @@ import lombok.ToString;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ToString
|
@ToString(callSuper = true)
|
||||||
public class EntityDataQuery extends AbstractDataQuery<EntityDataPageLink> {
|
public class EntityDataQuery extends AbstractDataQuery<EntityDataPageLink> {
|
||||||
|
|
||||||
public EntityDataQuery() {
|
public EntityDataQuery() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user