diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index d8912c18fb..b09e94202d 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -27,6 +27,7 @@ import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; +import java.util.Random; import java.util.UUID; import java.util.concurrent.*; @@ -176,10 +177,23 @@ public abstract class AbstractTransportService implements TransportService { sessions.remove(uuid); sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); } else { - process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder() - .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) - .setRpcSubscription(sessionMD.isSubscribedToRPC()) - .setLastActivityTime(sessionMD.getLastActivityTime()).build(), null); + if (sessionMD.getLastActivityTime() > sessionMD.getLastReportedActivityTime()) { + final long lastActivityTime = sessionMD.getLastActivityTime(); + process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder() + .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) + .setRpcSubscription(sessionMD.isSubscribedToRPC()) + .setLastActivityTime(sessionMD.getLastActivityTime()).build(), new TransportServiceCallback() { + @Override + public void onSuccess(Void msg) { + sessionMD.setLastReportedActivityTime(lastActivityTime); + } + + @Override + public void onError(Throwable e) { + log.warn("[{}] Failed to report last activity time", uuid, e); + } + }); + } } }); } @@ -288,7 +302,7 @@ public abstract class AbstractTransportService implements TransportService { } this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); this.transportCallbackExecutor = Executors.newWorkStealingPool(20); - this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS); + this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); } public void destroy() { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java index 411bbd5b72..d60d8fe624 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java @@ -34,6 +34,7 @@ class SessionMetaData { private ScheduledFuture scheduledFuture; private volatile long lastActivityTime; + private volatile long lastReportedActivityTime; private volatile boolean subscribedToAttributes; private volatile boolean subscribedToRPC;