diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 59681d3daa..5716976b6f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -343,15 +343,31 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer s -> { TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s; List updateData = null; + Map keyStates = sub.getKeyStates(); if (sub.isAllKeys()) { - updateData = data; + if (sub.isLatestValues()) { + for (TsKvEntry kv : data) { + Long stateTs = keyStates.get(kv.getKey()); + if (stateTs == null || kv.getTs() > stateTs) { + if (updateData == null) { + updateData = new ArrayList<>(); + } + updateData.add(kv); + } + } + } else { + updateData = data; + } } else { for (TsKvEntry kv : data) { - if (sub.getKeyStates().containsKey((kv.getKey()))) { - if (updateData == null) { - updateData = new ArrayList<>(); + Long stateTs = keyStates.get(kv.getKey()); + if (stateTs != null) { + if (!sub.isLatestValues() || kv.getTs() > stateTs) { + if (updateData == null) { + updateData = new ArrayList<>(); + } + updateData.add(kv); } - updateData.add(kv); } } } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 7288a8bac9..2e4fe9730a 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -105,6 +105,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.thingsboard.server.common.data.DataConstants.LATEST_TELEMETRY_SCOPE; + /** * Created by ashvayka on 27.03.18. */ @@ -667,24 +669,7 @@ public class DefaultWebSocketService implements WebSocketService { data.forEach(v -> subState.put(v.getKey(), v.getTs())); Lock subLock = new ReentrantLock(); - TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() - .serviceId(serviceId) - .sessionId(sessionId) - .subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())) - .tenantId(sessionRef.getSecurityCtx().getTenantId()) - .entityId(entityId) - .updateProcessor((subscription, update) -> { - subLock.lock(); - try { - sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update); - } finally { - subLock.unlock(); - } - }) - .queryTs(queryTs) - .allKeys(true) - .keyStates(subState) - .build(); + TbTimeSeriesSubscription sub = getTsSubscription(subState, subLock, sessionId, sessionRef, cmd, entityId, queryTs, true); subLock.lock(); try { @@ -712,6 +697,28 @@ public class DefaultWebSocketService implements WebSocketService { on(r -> Futures.addCallback(tsService.findAllLatest(sessionRef.getSecurityCtx().getTenantId(), entityId), callback, executor), callback::onFailure)); } + private TbTimeSeriesSubscription getTsSubscription(Map subState, Lock subLock, String sessionId, WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, EntityId entityId, long queryTs, boolean allKeys) { + return TbTimeSeriesSubscription.builder() + .serviceId(serviceId) + .sessionId(sessionId) + .subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())) + .tenantId(sessionRef.getSecurityCtx().getTenantId()) + .entityId(entityId) + .updateProcessor((subscription, update) -> { + subLock.lock(); + try { + sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update); + } finally { + subLock.unlock(); + } + }) + .queryTs(queryTs) + .allKeys(allKeys) + .keyStates(subState) + .latestValues(LATEST_TELEMETRY_SCOPE.equals(cmd.getScope())) + .build(); + } + private FutureCallback> getSubscriptionCallback(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long queryTs, final long startTs, final List keys) { return new FutureCallback<>() { @@ -722,24 +729,7 @@ public class DefaultWebSocketService implements WebSocketService { data.forEach(v -> subState.put(v.getKey(), v.getTs())); Lock subLock = new ReentrantLock(); - TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder() - .serviceId(serviceId) - .sessionId(sessionId) - .subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId())) - .tenantId(sessionRef.getSecurityCtx().getTenantId()) - .entityId(entityId) - .updateProcessor((subscription, update) -> { - subLock.lock(); - try { - sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update); - } finally { - subLock.unlock(); - } - }) - .queryTs(queryTs) - .allKeys(false) - .keyStates(subState) - .build(); + TbTimeSeriesSubscription sub = getTsSubscription(subState, subLock, sessionId, sessionRef, cmd, entityId, queryTs, false); subLock.lock(); try { diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index 959db43125..1c5b1dd154 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -31,6 +31,7 @@ import org.thingsboard.server.service.ws.AuthCmd; import org.thingsboard.server.service.ws.WsCmd; import org.thingsboard.server.service.ws.WsCommandsWrapper; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; @@ -38,6 +39,7 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd; +import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.net.URI; import java.nio.channels.NotYetConnectedException; @@ -271,6 +273,18 @@ public class TbTestWebSocketClient extends WebSocketClient { return sendEntityDataQuery(edq); } + public JsonNode sendTimeseriesCmd(EntityId entityId, String scope) { + log.warn("sendTimeseriesCmd entityId: {}, scope: {}", entityId, scope); + TimeseriesSubscriptionCmd cmd = new TimeseriesSubscriptionCmd(0, 0, 0, 10, null); + cmd.setEntityId(entityId.getId().toString()); + cmd.setEntityType(entityId.getEntityType().toString()); + cmd.setCmdId(1); + cmd.setScope(scope); + send(cmd); + String msg = this.waitForReply(); + return JacksonUtil.fromString(msg, JsonNode.class); + } + public void send(WsCmd... cmds) { WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); cmdsWrapper.setCmds(List.of(cmds)); diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index 9801907d3b..3842cf917b 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -68,11 +68,13 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; +import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -584,6 +586,33 @@ public class WebsocketApiTest extends AbstractControllerTest { Assert.assertNull(msg); } + @Test + public void testTimeseriesSubscriptionCmd() throws Exception { + long now = System.currentTimeMillis() - 100; + + long lastTs = now - TimeUnit.MINUTES.toMillis(1); + TsKvEntry dataPoint1 = new BasicTsKvEntry(lastTs, new LongDataEntry("temperature", 42L)); + sendTelemetry(device, List.of(dataPoint1)); + + JsonNode update = getWsClient().sendTimeseriesCmd(device.getId(), "LATEST_TELEMETRY"); + JsonNode data = update.get("data"); + Assert.assertEquals(1, data.size()); + Assert.assertEquals(JacksonUtil.newArrayNode().add(lastTs).add("42"), data.get("temperature").get(0)); + + //Sending update from the past, while latest value has new timestamp; + TsKvEntry dataPoint4 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(5), new LongDataEntry("temperature", 45L)); + getWsClient().registerWaitForUpdate(); + sendTelemetry(device, List.of(dataPoint4)); + String msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); + Assert.assertNull(msg); + + //Sending duplicate update again + getWsClient().registerWaitForUpdate(); + sendTelemetry(device, List.of(dataPoint4)); + msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1)); + Assert.assertNull(msg); + } + @Test public void testEntityDataLatestTsWsCmd() throws Exception { long now = System.currentTimeMillis(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index b53d6daec2..b2d9d59cca 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -148,4 +148,6 @@ public class DataConstants { public static final String CF_QUEUE_NAME = "CalculatedFields"; public static final String CF_STATES_QUEUE_NAME = "CalculatedFieldStates"; + public static final String LATEST_TELEMETRY_SCOPE = "LATEST_TELEMETRY"; + }