diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 481ee7762e..7e1a459e3a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -334,7 +335,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return null; } return constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), - JsonUtils.parse(JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeEvent.getBody()))); + JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeEvent.getBody()))); } private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 2f72d96770..af299c84c4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -137,48 +137,49 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } String scope = attributesRequestMsg.getScope(); ListenableFuture> findAttrFuture = attributesService.findAll(tenantId, entityId, scope); - return Futures.transformAsync(findAttrFuture, ssAttributes -> { - if (ssAttributes == null || ssAttributes.isEmpty()) { - log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, - edge.getName(), - entityId.getEntityType(), - entityId.getId()); - return Futures.immediateFuture(null); - } - return processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg); - }, dbCallbackExecutorService); + return Futures.transformAsync(findAttrFuture, ssAttributes + -> processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg), + dbCallbackExecutorService); } private ListenableFuture processEntityAttributesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge, EdgeEventType entityType, String scope, List ssAttributes, AttributesRequestMsg attributesRequestMsg) { try { - Map entityData = new HashMap<>(); - ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode(); - for (AttributeKvEntry attr : ssAttributes) { - if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) - && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { - continue; - } - if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { - attributes.put(attr.getKey(), attr.getBooleanValue().get()); - } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { - attributes.put(attr.getKey(), attr.getDoubleValue().get()); - } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { - attributes.put(attr.getKey(), attr.getLongValue().get()); - } else { - attributes.put(attr.getKey(), attr.getValueAsString()); - } - } ListenableFuture future; - if (attributes.size() > 0) { - entityData.put("kv", attributes); - entityData.put("scope", scope); - JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); - log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); - future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); - } else { + if (ssAttributes == null || ssAttributes.isEmpty()) { + log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, + edge.getName(), + entityId.getEntityType(), + entityId.getId()); future = Futures.immediateFuture(null); + } else { + Map entityData = new HashMap<>(); + ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + for (AttributeKvEntry attr : ssAttributes) { + if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) + && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { + continue; + } + if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { + attributes.put(attr.getKey(), attr.getBooleanValue().get()); + } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { + attributes.put(attr.getKey(), attr.getDoubleValue().get()); + } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { + attributes.put(attr.getKey(), attr.getLongValue().get()); + } else { + attributes.put(attr.getKey(), attr.getValueAsString()); + } + } + if (attributes.size() > 0) { + entityData.put("kv", attributes); + entityData.put("scope", scope); + JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); + log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); + future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); + } else { + future = Futures.immediateFuture(null); + } } return Futures.transformAsync(future, v -> processLatestTimeseriesAndAddToEdgeQueue(tenantId, entityId, edge, entityType), dbCallbackExecutorService); } catch (Exception e) { @@ -199,16 +200,18 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { entityId.getId()); return Futures.immediateFuture(null); } - List> futures = new ArrayList<>(); + Map> tsData = new HashMap<>(); for (TsKvEntry tsKvEntry : tsKvEntries) { if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) { continue; } - ObjectNode entityBody = JacksonUtil.OBJECT_MAPPER.createObjectNode(); - ObjectNode ts = JacksonUtil.OBJECT_MAPPER.createObjectNode(); - ts.put(tsKvEntry.getKey(), tsKvEntry.getValueAsString()); - entityBody.set("data", ts); - entityBody.put("ts", tsKvEntry.getTs()); + tsData.computeIfAbsent(tsKvEntry.getTs(), k -> new HashMap<>()).put(tsKvEntry.getKey(), tsKvEntry.getValue()); + } + List> futures = new ArrayList<>(); + for (Map.Entry> entry : tsData.entrySet()) { + Map entityBody = new HashMap<>(); + entityBody.put("data", entry.getValue()); + entityBody.put("ts", entry.getKey()); futures.add(saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.TIMESERIES_UPDATED, entityId, JacksonUtil.valueToTree(entityBody))); } return Futures.transform(Futures.allAsList(futures), v -> null, dbCallbackExecutorService); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 6ec9ae0e06..804f014bdc 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -701,7 +701,7 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { public void testVerifyDeliveryOfLatestTimeseriesOnAttributesRequest() throws Exception { Device device = findDeviceByName("Edge Device 1"); - JsonNode timeseriesData = mapper.readTree("{\"temperature\":25}"); + JsonNode timeseriesData = mapper.readTree("{\"temperature\":25, \"isEnabled\": true}"); doPost("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, timeseriesData); @@ -740,9 +740,17 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { TransportProtos.PostTelemetryMsg timeseriesUpdatedMsg = latestEntityDataMsg.getPostTelemetryMsg(); Assert.assertEquals(1, timeseriesUpdatedMsg.getTsKvListList().size()); TransportProtos.TsKvListProto tsKvListProto = timeseriesUpdatedMsg.getTsKvListList().get(0); - Assert.assertEquals(1, tsKvListProto.getKvList().size()); - TransportProtos.KeyValueProto keyValueProto = tsKvListProto.getKvList().get(0); - Assert.assertEquals(25, keyValueProto.getLongV()); - Assert.assertEquals("temperature", keyValueProto.getKey()); + Assert.assertEquals(2, tsKvListProto.getKvList().size()); + for (TransportProtos.KeyValueProto keyValueProto : tsKvListProto.getKvList()) { + if ("temperature".equals(keyValueProto.getKey())) { + Assert.assertEquals(TransportProtos.KeyValueType.LONG_V, keyValueProto.getType()); + Assert.assertEquals(25, keyValueProto.getLongV()); + } else if ("isEnabled".equals(keyValueProto.getKey())) { + Assert.assertEquals(TransportProtos.KeyValueType.BOOLEAN_V, keyValueProto.getType()); + Assert.assertTrue(keyValueProto.getBoolV()); + } else { + Assert.fail("Unexpected key: " + keyValueProto.getKey()); + } + } } }