From 20dc82275c407960dd5a6536542e7f5cf53bc3c1 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 23 Jun 2020 16:07:38 +0300 Subject: [PATCH] TimeSeries updates --- ...efaultTbEntityDataSubscriptionService.java | 8 +- .../subscription/TbEntityDataSubCtx.java | 20 +++-- .../controller/BaseWebsocketApiTest.java | 82 +++++++++++++++++++ application/src/test/resources/logback.xml | 2 +- 4 files changed, 102 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 0a85e5e993..80c9b80b55 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -282,7 +282,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); } wsService.sendWsMsg(ctx.getSessionId(), update); - createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList())); + createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), false); } @Override @@ -357,8 +357,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } private void createSubscriptions(TbEntityDataSubCtx ctx, List keys) { + createSubscriptions(ctx, keys, true); + } + + private void createSubscriptions(TbEntityDataSubCtx ctx, List keys, boolean latest) { //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates. - List tbSubs = ctx.createSubscriptions(keys); + List tbSubs = ctx.createSubscriptions(keys, latest); tbSubs.forEach(sub -> localSubscriptionService.addSubscription(sub)); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index b3c1b6ea6e..de3811d16b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; @Slf4j @Data @@ -81,7 +82,7 @@ public class TbEntityDataSubCtx { this.data = data; } - public List createSubscriptions(List keys) { + public List createSubscriptions(List keys, boolean resultToLatestValues) { this.subToEntityIdMap = new HashMap<>(); tbSubs = new ArrayList<>(); Map> keysByType = new HashMap<>(); @@ -92,7 +93,7 @@ public class TbEntityDataSubCtx { subToEntityIdMap.put(subIdx, entityData.getEntityId()); switch (keysType) { case TIME_SERIES: - tbSubs.add(createTsSub(entityData, subIdx, keysList)); + tbSubs.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues)); break; case CLIENT_ATTRIBUTE: tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList)); @@ -128,7 +129,7 @@ public class TbEntityDataSubCtx { .build(); } - private TbSubscription createTsSub(EntityData entityData, int subIdx, List subKeys) { + private TbSubscription createTsSub(EntityData entityData, int subIdx, List subKeys, boolean resultToLatestValues) { Map keyStates = buildKeyStats(entityData, EntityKeyType.TIME_SERIES, subKeys); if (entityData.getTimeseries() != null) { entityData.getTimeseries().forEach((k, v) -> { @@ -144,7 +145,12 @@ public class TbEntityDataSubCtx { .subscriptionId(subIdx) .tenantId(sessionRef.getSecurityCtx().getTenantId()) .entityId(entityData.getEntityId()) - .updateConsumer(this::sendTsWsMsg) + .updateConsumer(new BiConsumer() { + @Override + public void accept(String sessionId, SubscriptionUpdate subscriptionUpdate) { + sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues); + } + }) .allKeys(false) .keyStates(keyStates) .build(); @@ -165,11 +171,11 @@ public class TbEntityDataSubCtx { return keyStates; } - private void sendTsWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate) { - sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES); + private void sendWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { + sendWsMsg(sessionId, subscriptionUpdate, keyType, true); } - private void sendWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { + private void sendWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues) { EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); if (entityId != null) { log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index 203659cd40..ded943b523 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -48,6 +48,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd; import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; import java.util.ArrayList; import java.util.Arrays; @@ -158,6 +159,87 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { Assert.assertEquals(new TsValue(dataPoint3.getTs(), dataPoint3.getValueAsString()), tsArray[2]); } + @Test + public void testEntityDataTimeSeriesWsCmd() throws Exception { + Device device = new Device(); + device.setName("Device"); + device.setType("default"); + device.setLabel("testLabel" + (int) (Math.random() * 1000)); + device = doPost("/api/device", device, Device.class); + + long now = System.currentTimeMillis(); + + DeviceTypeFilter dtf = new DeviceTypeFilter(); + dtf.setDeviceNameFilter("D"); + dtf.setDeviceType("default"); + EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null); + + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper(); + wrapper.setEntityDataCmds(Collections.singletonList(cmd)); + + wsClient.send(mapper.writeValueAsString(wrapper)); + String msg = wsClient.waitForReply(); + EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class); + Assert.assertEquals(1, update.getCmdId()); + PageData pageData = update.getData(); + Assert.assertNotNull(pageData); + Assert.assertEquals(1, pageData.getData().size()); + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId()); + + TimeSeriesCmd tsCmd = new TimeSeriesCmd(); + tsCmd.setKeys(Arrays.asList("temperature")); + tsCmd.setAgg(Aggregation.NONE.name()); + tsCmd.setLimit(1000); + tsCmd.setStartTs(now - TimeUnit.HOURS.toMillis(1)); + tsCmd.setTimeWindow(TimeUnit.HOURS.toMillis(1)); + + TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L)); + TsKvEntry dataPoint2 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(2), new LongDataEntry("temperature", 43L)); + TsKvEntry dataPoint3 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(3), new LongDataEntry("temperature", 44L)); + List tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3); + + sendTelemetry(device, tsData); + Thread.sleep(100); + + cmd = new EntityDataCmd(1, null, null, null, tsCmd); + wrapper = new TelemetryPluginCmdsWrapper(); + wrapper.setEntityDataCmds(Collections.singletonList(cmd)); + wsClient.send(mapper.writeValueAsString(wrapper)); + msg = wsClient.waitForReply(); + update = mapper.readValue(msg, EntityDataUpdate.class); + Assert.assertEquals(1, update.getCmdId()); + List listData = update.getUpdate(); + Assert.assertNotNull(listData); + Assert.assertEquals(1, listData.size()); + Assert.assertEquals(device.getId(), listData.get(0).getEntityId()); + TsValue[] tsArray = listData.get(0).getTimeseries().get("temperature"); + Assert.assertEquals(3, tsArray.length); + Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsArray[0]); + Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsArray[1]); + Assert.assertEquals(new TsValue(dataPoint3.getTs(), dataPoint3.getValueAsString()), tsArray[2]); + + now = System.currentTimeMillis(); + TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L)); + + wsClient.registerWaitForUpdate(); + sendTelemetry(device, Arrays.asList(dataPoint4)); + msg = wsClient.waitForUpdate(); + + update = mapper.readValue(msg, EntityDataUpdate.class); + Assert.assertEquals(1, update.getCmdId()); + List eData = update.getUpdate(); + Assert.assertNotNull(eData); + Assert.assertEquals(1, eData.size()); + Assert.assertEquals(device.getId(), eData.get(0).getEntityId()); + Assert.assertNotNull(eData.get(0).getTimeseries()); + TsValue[] tsValues = eData.get(0).getTimeseries().get("temperature"); + Assert.assertNotNull(tsValues); + Assert.assertEquals(new TsValue(dataPoint4.getTs(), dataPoint4.getValueAsString()), tsValues[0]); + } + @Test public void testEntityDataLatestWidgetFlow() throws Exception { Device device = new Device(); diff --git a/application/src/test/resources/logback.xml b/application/src/test/resources/logback.xml index f991a40078..a9e244f1e9 100644 --- a/application/src/test/resources/logback.xml +++ b/application/src/test/resources/logback.xml @@ -7,7 +7,7 @@ - +