diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 7c772420ba..6e0657d347 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -35,18 +35,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; -import org.thingsboard.server.common.data.kv.BooleanDataEntry; -import org.thingsboard.server.common.data.kv.DataType; -import org.thingsboard.server.common.data.kv.DoubleDataEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; -import org.thingsboard.server.common.data.kv.ReadTsKvQuery; -import org.thingsboard.server.common.data.kv.StringDataEntry; -import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.dao.attributes.AttributesService; @@ -381,7 +370,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) { EntityId entityId = subscription.getEntityId(); - log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address); + log.trace("[{}] Registering remote subscription [{}] for entity [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address); registerSubscription(sessionId, entityId, subscription); if (subscription.getType() == TelemetryFeature.ATTRIBUTES) { final Map keyStates = subscription.getKeyStates(); @@ -401,17 +390,22 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio long curTs = System.currentTimeMillis(); List queries = new ArrayList<>(); subscription.getKeyStates().entrySet().forEach(e -> { - queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs)); + if (curTs > e.getValue()) { + queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs, 0, 1000, Aggregation.NONE)); + } else { + log.debug("[{}] Invalid subscription [{}], entityId [{}] curTs [{}]", sessionId, subscription, entityId, curTs); + } }); - - DonAsynchron.withCallback(tsService.findAll(entityId, queries), - missedUpdates -> { - if (missedUpdates != null && !missedUpdates.isEmpty()) { - tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates)); - } - }, - e -> log.error("Failed to fetch missed updates.", e), - tsCallBackExecutor); + if (!queries.isEmpty()) { + DonAsynchron.withCallback(tsService.findAll(entityId, queries), + missedUpdates -> { + if (missedUpdates != null && !missedUpdates.isEmpty()) { + tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates)); + } + }, + e -> log.error("Failed to fetch missed updates.", e), + tsCallBackExecutor); + } } }