From 031706e1cf4033ad33efeed417eb412a01f46d6b Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 27 Dec 2022 16:53:26 +0200 Subject: [PATCH] Added functionality to push latest timeseries values to edge on assing entities to edge --- .../rpc/sync/DefaultEdgeRequestsService.java | 161 +++++++++--------- .../server/edge/BaseDeviceEdgeTest.java | 52 ++++++ 2 files changed, 133 insertions(+), 80 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 4be7ddb1c8..2f72d96770 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelationsQuery; import org.thingsboard.server.common.data.relation.EntitySearchDirection; @@ -52,13 +53,10 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.relation.RelationsSearchParameters; import org.thingsboard.server.common.data.widget.WidgetType; import org.thingsboard.server.common.data.widget.WidgetsBundle; -import org.thingsboard.server.dao.asset.AssetProfileService; -import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; -import org.thingsboard.server.dao.device.DeviceProfileService; -import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg; @@ -92,25 +90,16 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Autowired private AttributesService attributesService; + @Autowired + private TimeseriesService timeseriesService; + @Autowired private RelationService relationService; - @Autowired - private DeviceService deviceService; - - @Autowired - private AssetService assetService; - @Lazy @Autowired private TbEntityViewService entityViewService; - @Autowired - private DeviceProfileService deviceProfileService; - - @Autowired - private AssetProfileService assetProfileService; - @Autowired private WidgetsBundleService widgetsBundleService; @@ -141,77 +130,89 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EntityId entityId = EntityIdFactory.getByTypeAndUuid( EntityType.valueOf(attributesRequestMsg.getEntityType()), new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB())); - final EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); - if (type == null) { + final EdgeEventType entityType = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); + if (entityType == null) { log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType()); return Futures.immediateFuture(null); } - SettableFuture futureToSet = SettableFuture.create(); String scope = attributesRequestMsg.getScope(); ListenableFuture> findAttrFuture = attributesService.findAll(tenantId, entityId, scope); - Futures.addCallback(findAttrFuture, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List ssAttributes) { - if (ssAttributes == null || ssAttributes.isEmpty()) { - log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, - edge.getName(), - entityId.getEntityType(), - entityId.getId()); - futureToSet.set(null); - return; - } - - try { - Map entityData = new HashMap<>(); - ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode(); - for (AttributeKvEntry attr : ssAttributes) { - if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) - && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { - continue; - } - if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { - attributes.put(attr.getKey(), attr.getBooleanValue().get()); - } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { - attributes.put(attr.getKey(), attr.getDoubleValue().get()); - } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { - attributes.put(attr.getKey(), attr.getLongValue().get()); - } else { - attributes.put(attr.getKey(), attr.getValueAsString()); - } - } - entityData.put("kv", attributes); - entityData.put("scope", scope); - JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); - log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), type, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void unused) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable throwable) { - String errMsg = String.format("[%s] Failed to save edge event [%s]", edge.getId(), attributesRequestMsg); - log.error(errMsg, throwable); - futureToSet.setException(new RuntimeException(errMsg, throwable)); - } - }, dbCallbackExecutorService); - } catch (Exception e) { - String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg); - log.error(errMsg, e); - futureToSet.setException(new RuntimeException(errMsg, e)); - } - } - - @Override - public void onFailure(Throwable t) { - String errMsg = String.format("[%s] Can't find attributes [%s]", edge.getId(), attributesRequestMsg); - log.error(errMsg, t); - futureToSet.setException(new RuntimeException(errMsg, t)); + return Futures.transformAsync(findAttrFuture, ssAttributes -> { + if (ssAttributes == null || ssAttributes.isEmpty()) { + log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, + edge.getName(), + entityId.getEntityType(), + entityId.getId()); + return Futures.immediateFuture(null); } + return processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg); + }, dbCallbackExecutorService); + } + + private ListenableFuture processEntityAttributesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge, + EdgeEventType entityType, String scope, List ssAttributes, + AttributesRequestMsg attributesRequestMsg) { + try { + Map entityData = new HashMap<>(); + ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + for (AttributeKvEntry attr : ssAttributes) { + if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) + && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { + continue; + } + if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { + attributes.put(attr.getKey(), attr.getBooleanValue().get()); + } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { + attributes.put(attr.getKey(), attr.getDoubleValue().get()); + } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { + attributes.put(attr.getKey(), attr.getLongValue().get()); + } else { + attributes.put(attr.getKey(), attr.getValueAsString()); + } + } + ListenableFuture future; + if (attributes.size() > 0) { + entityData.put("kv", attributes); + entityData.put("scope", scope); + JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); + log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); + future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); + } else { + future = Futures.immediateFuture(null); + } + return Futures.transformAsync(future, v -> processLatestTimeseriesAndAddToEdgeQueue(tenantId, entityId, edge, entityType), dbCallbackExecutorService); + } catch (Exception e) { + String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg); + log.error(errMsg, e); + return Futures.immediateFailedFuture(new RuntimeException(errMsg, e)); + } + } + + private ListenableFuture processLatestTimeseriesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge, + EdgeEventType entityType) { + ListenableFuture> getAllLatestFuture = timeseriesService.findAllLatest(tenantId, entityId); + return Futures.transformAsync(getAllLatestFuture, tsKvEntries -> { + if (tsKvEntries == null || tsKvEntries.isEmpty()) { + log.trace("[{}][{}] No timeseries found for entity {} [{}]", tenantId, + edge.getName(), + entityId.getEntityType(), + entityId.getId()); + return Futures.immediateFuture(null); + } + List> futures = new ArrayList<>(); + for (TsKvEntry tsKvEntry : tsKvEntries) { + if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) { + continue; + } + ObjectNode entityBody = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + ObjectNode ts = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + ts.put(tsKvEntry.getKey(), tsKvEntry.getValueAsString()); + entityBody.set("data", ts); + entityBody.put("ts", tsKvEntry.getTs()); + futures.add(saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.TIMESERIES_UPDATED, entityId, JacksonUtil.valueToTree(entityBody))); + } + return Futures.transform(Futures.allAsList(futures), v -> null, dbCallbackExecutorService); }, dbCallbackExecutorService); - return futureToSet; } @Override diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 649bf26210..1b19f481f2 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -323,6 +323,9 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { "inactivityTimeout", "3600000"); sendAttributesRequestAndVerify(device, DataConstants.SHARED_SCOPE, "{\"key2\":\"value2\"}", "key2", "value2"); + + doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SERVER_SCOPE, "keys","key1, inactivityTimeout"); + doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SHARED_SCOPE, "keys", "key2"); } @Test @@ -640,4 +643,53 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { client.disconnect(); } + + @Test + public void testVerifyDeliveryOfLatestTimeseriesOnAttributesRequest() throws Exception { + Device device = findDeviceByName("Edge Device 1"); + + JsonNode timeseriesData = mapper.readTree("{\"temperature\":25}"); + + doPost("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, + timeseriesData); + + // Wait before device timeseries saved to database before requesting them from edge + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/timeseries"; + List actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {}); + return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("temperature"); + }); + + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + AttributesRequestMsg.Builder attributesRequestMsgBuilder = AttributesRequestMsg.newBuilder(); + attributesRequestMsgBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits()); + attributesRequestMsgBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits()); + attributesRequestMsgBuilder.setEntityType(EntityType.DEVICE.name()); + attributesRequestMsgBuilder.setScope(DataConstants.SERVER_SCOPE); + uplinkMsgBuilder.addAttributesRequestMsg(attributesRequestMsgBuilder.build()); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.expectMessageAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + Assert.assertTrue(edgeImitator.waitForMessages()); + + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof EntityDataProto); + EntityDataProto latestEntityDataMsg = (EntityDataProto) latestMessage; + Assert.assertEquals(device.getUuidId().getMostSignificantBits(), latestEntityDataMsg.getEntityIdMSB()); + Assert.assertEquals(device.getUuidId().getLeastSignificantBits(), latestEntityDataMsg.getEntityIdLSB()); + Assert.assertEquals(device.getId().getEntityType().name(), latestEntityDataMsg.getEntityType()); + Assert.assertTrue(latestEntityDataMsg.hasPostTelemetryMsg()); + + TransportProtos.PostTelemetryMsg timeseriesUpdatedMsg = latestEntityDataMsg.getPostTelemetryMsg(); + Assert.assertEquals(1, timeseriesUpdatedMsg.getTsKvListList().size()); + TransportProtos.TsKvListProto tsKvListProto = timeseriesUpdatedMsg.getTsKvListList().get(0); + Assert.assertEquals(1, tsKvListProto.getKvList().size()); + TransportProtos.KeyValueProto keyValueProto = tsKvListProto.getKvList().get(0); + Assert.assertEquals(25, keyValueProto.getLongV()); + Assert.assertEquals("temperature", keyValueProto.getKey()); + } }