diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java index 2fd991969d..b59dc096ee 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java @@ -426,16 +426,11 @@ public class EdgeMsgConstructorUtils { .setEntityIdMSB(entityId.getId().getMostSignificantBits()) .setEntityIdLSB(entityId.getId().getLeastSignificantBits()) .setEntityType(entityId.getEntityType().name()); + long ts = getTs(entityData.getAsJsonObject()); switch (actionType) { case TIMESERIES_UPDATED: try { JsonObject data = entityData.getAsJsonObject(); - long ts; - if (data.get("ts") != null && !data.get("ts").isJsonNull()) { - ts = data.getAsJsonPrimitive("ts").getAsLong(); - } else { - ts = System.currentTimeMillis(); - } builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data.getAsJsonObject("data"), ts)); } catch (Exception e) { log.warn("[{}][{}] Can't convert to telemetry proto, entityData [{}]", tenantId, entityId, entityData, e); @@ -445,8 +440,13 @@ public class EdgeMsgConstructorUtils { try { JsonObject data = entityData.getAsJsonObject(); TransportProtos.PostAttributeMsg attributesUpdatedMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")); - builder.setAttributesUpdatedMsg(attributesUpdatedMsg); + if (data.has("isPostAttributes") && data.getAsJsonPrimitive("isPostAttributes").getAsBoolean()) { + builder.setPostAttributesMsg(attributesUpdatedMsg); + } else { + builder.setAttributesUpdatedMsg(attributesUpdatedMsg); + } builder.setPostAttributeScope(getScopeOfDefault(data)); + builder.setAttributeTs(ts); } catch (Exception e) { log.warn("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e); } @@ -457,6 +457,7 @@ public class EdgeMsgConstructorUtils { TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")); builder.setPostAttributesMsg(postAttributesMsg); builder.setPostAttributeScope(getScopeOfDefault(data)); + builder.setAttributeTs(ts); } catch (Exception e) { log.warn("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e); } @@ -478,6 +479,13 @@ public class EdgeMsgConstructorUtils { return builder.build(); } + private static long getTs(JsonObject data) { + if (data.get("ts") != null && !data.get("ts").isJsonNull()) { + return data.getAsJsonPrimitive("ts").getAsLong(); + } + return System.currentTimeMillis(); + } + private static String getScopeOfDefault(JsonObject data) { JsonPrimitive scope = data.getAsJsonPrimitive("scope"); String result = DataConstants.SERVER_SCOPE; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 2306b618d4..eeaa80d6a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -54,6 +54,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -79,7 +80,10 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; @Slf4j public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { @@ -111,7 +115,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { abstract protected String getMsgSourceKey(); - public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) { + public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) throws Exception { log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); @@ -122,11 +126,14 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId = pair.getValue(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); if (entityData.hasPostAttributesMsg()) { - result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); + long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); } if (entityData.hasAttributesUpdatedMsg()) { - metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); + long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); @@ -254,23 +261,39 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { return new ImmutablePair<>(queueName, ruleChainId); } - private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, + TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg() - .queueName(defaultQueueAndRuleChain.getKey()) - .type(TbMsgType.POST_ATTRIBUTES_REQUEST) - .originator(entityId) - .customerId(customerId) - .copyMetaData(metaData) - .data(gson.toJson(json)) - .ruleChainId(defaultQueueAndRuleChain.getValue()) - .build(); - edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); + ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); + Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); + public void onSuccess(List attributesToSave) { + JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg() + .queueName(defaultQueueAndRuleChain.getKey()) + .type(TbMsgType.POST_ATTRIBUTES_REQUEST) + .originator(entityId) + .customerId(customerId) + .copyMetaData(metaData) + .data(gson.toJson(jsonToSave)) + .ruleChainId(defaultQueueAndRuleChain.getValue()) + .build(); + edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't process post attributes [{}]", tenantId, msg, t); + futureToSet.setException(t); + } + }); } @Override @@ -278,7 +301,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { log.error("[{}] Can't process post attributes [{}]", tenantId, msg, t); futureToSet.setException(t); } - }); + }, dbCallbackExecutorService); return futureToSet; } @@ -286,33 +309,47 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, - TbMsgMetaData metaData) { + TbMsgMetaData metaData, + long ts) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); - String scope = metaData.getValue("scope"); - tsSubService.saveAttributes(AttributesSaveRequest.builder() - .tenantId(tenantId) - .entityId(entityId) - .scope(AttributeScope.valueOf(scope)) - .entries(attributes) - .callback(new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void tmp) { - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg() - .queueName(defaultQueueAndRuleChain.getKey()) - .type(TbMsgType.ATTRIBUTES_UPDATED) - .originator(entityId) - .customerId(customerId) - .copyMetaData(metaData) - .data(gson.toJson(json)) - .ruleChainId(defaultQueueAndRuleChain.getValue()) - .build(); - edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); + ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(List attributesToSave) { + JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave); + tsSubService.saveAttributes(AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(entityId) + .scope(scope) + .entries(attributesToSave) + .callback(new FutureCallback<>() { @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); + public void onSuccess(@Nullable Void tmp) { + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg() + .queueName(defaultQueueAndRuleChain.getKey()) + .type(TbMsgType.ATTRIBUTES_UPDATED) + .originator(entityId) + .customerId(customerId) + .copyMetaData(metaData) + .data(gson.toJson(jsonToSave)) + .ruleChainId(defaultQueueAndRuleChain.getValue()) + .build(); + edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); + futureToSet.setException(t); + } + }); } @Override @@ -320,22 +357,28 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); futureToSet.setException(t); } - }); - } + }) + .build()); + } - @Override - public void onFailure(Throwable t) { - log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); - futureToSet.setException(t); - } - }) - .build()); + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); return futureToSet; } - private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, - String entityType) { + private JsonObject filterAttributesFromJson(JsonObject json, List attributesToSave) { + Set keysToSave = attributesToSave.stream() + .map(KvEntry::getKey) + .collect(Collectors.toSet()); + json.keySet().removeIf(key -> !keysToSave.contains(key)); + return json; + } + private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { String scope = attributeDeleteMsg.getScope(); List attributeKeys = attributeDeleteMsg.getAttributeNamesList(); ListenableFuture> removeAllFuture = edgeCtx.getAttributesService().removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys); @@ -386,4 +429,19 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { return bodyJackson == null ? null : EdgeMsgConstructorUtils.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson)); } + private ListenableFuture> filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, + List attributes) { + List keys = attributes.stream().map(KvEntry::getKey).toList(); + ListenableFuture> future = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys); + return Futures.transform(future, input -> { + Map existingAttributesTs = input.stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs)); + return attributes.stream() + .filter(attribute -> { + String key = attribute.getKey(); + long incomingTs = attribute.getLastUpdateTs(); + return incomingTs > existingAttributesTs.getOrDefault(key, 0L); + }).toList(); + }, dbCallbackExecutorService); + } + } diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 3f85e559c7..a6cd127097 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -488,6 +488,68 @@ public class DeviceEdgeTest extends AbstractEdgeTest { doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); } + @Test + public void testSendOutdatedAttributeToCloud() throws Exception { + long ts = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + edgeImitator.expectResponsesAmount(1); + + ObjectNode attributesNode = JacksonUtil.newObjectNode(); + String originalValue = "original_value"; + attributesNode.put("test_attr", originalValue); + doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode); + + // Wait before device attributes saved to database + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/attributes/" + DataConstants.SERVER_SCOPE; + List actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {}); + return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("test_attr"); + }); + + JsonObject attributesData = new JsonObject(); + // incorrect msg, will not be saved, because of ts is lower than for already existing + String attributesKey = "test_attr"; + String attributeValueIncorrect = "test_value"; + // correct msg, will be saved, no ts issue + String attributeKey2 = "test_attr2"; + String attributeValue2Correct = "test_value2"; + attributesData.addProperty(attributesKey, attributeValueIncorrect); + attributesData.addProperty(attributeKey2, attributeValue2Correct); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); + entityDataBuilder.setEntityType(device.getId().getEntityType().name()); + entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits()); + entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits()); + entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData)); + entityDataBuilder.setPostAttributeScope(DataConstants.SERVER_SCOPE); + entityDataBuilder.setAttributeTs(ts); + + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); + + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE; + List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() { + }); + + Optional> customAttributeOpt = getAttributeByKey(attributesKey, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + Map customAttribute = customAttributeOpt.get(); + Assert.assertNotEquals(attributeValueIncorrect, customAttribute.get("value")); + Assert.assertEquals(originalValue, customAttribute.get("value")); + + customAttributeOpt = getAttributeByKey(attributeKey2, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + customAttribute = customAttributeOpt.get(); + Assert.assertEquals(attributeValue2Correct, customAttribute.get("value")); + + doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); + } + @Test public void testSendDeviceToCloudWithNameThatAlreadyExistsOnCloud() throws Exception { String deviceOnCloudName = StringUtils.randomAlphanumeric(15); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 5b8ab0d4c5..6a7482a3c1 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -131,6 +131,7 @@ message EntityDataProto { transport.PostAttributeMsg attributesUpdatedMsg = 6; string postAttributeScope = 7; AttributeDeleteMsg attributeDeleteMsg = 8; + optional int64 attributeTs = 9; } message AttributeDeleteMsg { diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index c2d6468e2e..2a208923d9 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -63,7 +63,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.function.Consumer; -import java.util.stream.Collectors; public class JsonConverter { @@ -540,10 +539,12 @@ public class JsonConverter { } public static Set convertToAttributes(JsonElement element) { - Set result = new HashSet<>(); long ts = System.currentTimeMillis(); - result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); - return result; + return convertToAttributes(element, ts); + } + + public static Set convertToAttributes(JsonElement element, long ts) { + return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList()); } private static List parseValues(JsonObject valuesObject) { @@ -702,4 +703,5 @@ public class JsonConverter { return ""; } } + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java index c2db1f9f20..28a904edb9 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java @@ -49,6 +49,7 @@ public class JsonUtils { } return json; } + public static JsonElement parse(Object value) { if (value instanceof Integer) { return new JsonPrimitive((Integer) value); @@ -67,7 +68,7 @@ public class JsonUtils { } } - public static JsonObject convertToJsonObject(Map map) { + public static JsonObject convertToJsonObject(Map map) { JsonObject jsonObject = new JsonObject(); for (Map.Entry entry : map.entrySet()) { jsonObject.add(entry.getKey(), parse(entry.getValue())); @@ -75,4 +76,5 @@ public class JsonUtils { return jsonObject; } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index b5270725bf..77f4937296 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -54,8 +54,6 @@ public abstract class AbstractTbMsgPushNode { entityBody.put("kv", dataJson); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put("ts", msg.getMetaDataTs()); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) { entityBody.put("isPostAttributes", true); } @@ -98,7 +97,7 @@ public abstract class AbstractTbMsgPushNode keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { }); entityBody.put("keys", keys); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); } case TIMESERIES_UPDATED -> { entityBody.put("data", dataJson); @@ -145,7 +144,7 @@ public abstract class AbstractTbMsgPushNode metadata) { - String scope = metadata.get(SCOPE); + String scope = metadata.get(DataConstants.SCOPE); if (StringUtils.isEmpty(scope)) { scope = config.getScope(); } @@ -163,7 +162,7 @@ public abstract class AbstractTbMsgPushNode