From 4eedea076633f4e1a6aef0245564bdf1510267ae Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 15 Jan 2025 16:11:54 +0200 Subject: [PATCH 1/7] EntityDataProto: add ts to correctly handle PostAttributeMsg update from/to Edge --- .../telemetry/EntityDataMsgConstructor.java | 6 +++ .../telemetry/BaseTelemetryProcessor.java | 46 ++++++++++++---- .../server/edge/DeviceEdgeTest.java | 53 +++++++++++++++++++ common/edge-api/src/main/proto/edge.proto | 1 + .../server/common/adaptor/JsonConverter.java | 10 ++-- .../common/transport/util/JsonUtils.java | 4 +- .../engine/edge/AbstractTbMsgPushNode.java | 1 + 7 files changed, 107 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java index 6493f97f57..f7f3cc31dd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java @@ -67,6 +67,9 @@ public class EntityDataMsgConstructor { TransportProtos.PostAttributeMsg attributesUpdatedMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")); builder.setAttributesUpdatedMsg(attributesUpdatedMsg); builder.setPostAttributeScope(getScopeOfDefault(data)); + if (data.get("ts") != null && !data.get("ts").isJsonNull()) { + builder.setAttributeTs(data.getAsJsonPrimitive("ts").getAsLong()); + } } catch (Exception e) { log.warn("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e); } @@ -77,6 +80,9 @@ public class EntityDataMsgConstructor { TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")); builder.setPostAttributesMsg(postAttributesMsg); builder.setPostAttributeScope(getScopeOfDefault(data)); + if (data.get("ts") != null && !data.get("ts").isJsonNull()) { + builder.setAttributeTs(data.getAsJsonPrimitive("ts").getAsLong()); + } } catch (Exception e) { log.warn("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 488fc3d8b9..520c7ef5ff 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -54,6 +54,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -79,7 +80,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; @Slf4j public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { @@ -114,7 +117,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { abstract protected String getMsgSourceKey(); - public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) { + public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) throws Exception { log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); @@ -125,11 +128,13 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId = pair.getValue(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); if (entityData.hasPostAttributesMsg()) { - result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); + long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); } if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); @@ -257,9 +262,13 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { return new ImmutablePair<>(queueName, ruleChainId); } - private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); + filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes, json); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); TbMsg tbMsg = TbMsg.newMsg() .queueName(defaultQueueAndRuleChain.getKey()) @@ -289,16 +298,18 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, - TbMsgMetaData metaData) { + TbMsgMetaData metaData, + long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); - String scope = metaData.getValue("scope"); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope")); + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); + List attributesToSave = filterAttributesByTs(tenantId, entityId, scope, attributes, json); tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(entityId) - .scope(AttributeScope.valueOf(scope)) - .entries(attributes) + .scope(scope) + .entries(attributesToSave) .callback(new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { @@ -390,4 +401,21 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson)); } + private List filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes, JsonObject jsonObject) throws Exception { + List keys = attributes.stream().map(KvEntry::getKey).toList(); + Map existingAttributesTs = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys).get() + .stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs)); + return attributes.stream() + .filter(attribute -> { + String key = attribute.getKey(); + long incomingTs = attribute.getLastUpdateTs(); + if (incomingTs > existingAttributesTs.getOrDefault(key, 0L)) { + return true; + } else { + jsonObject.remove(key); + return false; + } + }).toList(); + } + } diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 387011717a..11bc053bb5 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -488,6 +488,59 @@ public class DeviceEdgeTest extends AbstractEdgeTest { doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); } + @Test + public void testSendOutdatedAttributeToCloud() throws Exception { + long ts = System.currentTimeMillis(); + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + edgeImitator.expectResponsesAmount(1); + + ObjectNode attributesNode = JacksonUtil.newObjectNode(); + String originalValue = "original_value"; + attributesNode.put("test_attr", originalValue); + doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode); + + JsonObject attributesData = new JsonObject(); + // incorrect msg, will not be saved, because of ts is lower than for already existing + String attributesKey = "test_attr"; + String attributeValueIncorrect = "test_value"; + // correct msg, will be saved, no ts issue + String attributeKey2 = "test_attr2"; + String attributeValue2Correct = "test_value2"; + attributesData.addProperty(attributesKey, attributeValueIncorrect); + attributesData.addProperty(attributeKey2, attributeValue2Correct); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); + entityDataBuilder.setEntityType(device.getId().getEntityType().name()); + entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits()); + entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits()); + entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData)); + entityDataBuilder.setPostAttributeScope(DataConstants.SERVER_SCOPE); + entityDataBuilder.setAttributeTs(ts); + + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); + + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE; + List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() { + }); + + Optional> customAttributeOpt = getAttributeByKey(attributesKey, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + Map customAttribute = customAttributeOpt.get(); + Assert.assertNotEquals(attributeValueIncorrect, customAttribute.get("value")); + Assert.assertEquals(originalValue, customAttribute.get("value")); + + customAttributeOpt = getAttributeByKey(attributeKey2, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + customAttribute = customAttributeOpt.get(); + Assert.assertEquals(attributeValue2Correct, customAttribute.get("value")); + + doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); + } + @Test public void testSendDeviceToCloudWithNameThatAlreadyExistsOnCloud() throws Exception { String deviceOnCloudName = StringUtils.randomAlphanumeric(15); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 4dc762e14c..0557c5f5d4 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -132,6 +132,7 @@ message EntityDataProto { transport.PostAttributeMsg attributesUpdatedMsg = 6; string postAttributeScope = 7; AttributeDeleteMsg attributeDeleteMsg = 8; + optional int64 attributeTs = 9; } message AttributeDeleteMsg { diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index f4abf5f3a0..9226395289 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -63,7 +63,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.function.Consumer; -import java.util.stream.Collectors; public class JsonConverter { @@ -540,10 +539,12 @@ public class JsonConverter { } public static Set convertToAttributes(JsonElement element) { - Set result = new HashSet<>(); long ts = System.currentTimeMillis(); - result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); - return result; + return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList()); + } + + public static Set convertToAttributes(JsonElement element, long ts) { + return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList()); } private static List parseValues(JsonObject valuesObject) { @@ -702,4 +703,5 @@ public class JsonConverter { return ""; } } + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java index ed0b425c76..a97dc411b4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java @@ -49,6 +49,7 @@ public class JsonUtils { } return json; } + public static JsonElement parse(Object value) { if (value instanceof Integer) { return new JsonPrimitive((Integer) value); @@ -67,7 +68,7 @@ public class JsonUtils { } } - public static JsonObject convertToJsonObject(Map map) { + public static JsonObject convertToJsonObject(Map map) { JsonObject jsonObject = new JsonObject(); for (Map.Entry entry : map.entrySet()) { jsonObject.add(entry.getKey(), parse(entry.getValue())); @@ -75,4 +76,5 @@ public class JsonUtils { return jsonObject; } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 73368d40f5..f1fe1affc6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -89,6 +89,7 @@ public abstract class AbstractTbMsgPushNode { entityBody.put("kv", dataJson); + entityBody.put("ts", msg.getMetaDataTs()); entityBody.put(SCOPE, getScope(metadata)); if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) { entityBody.put("isPostAttributes", true); From e4b5490921d91215984edb7d3c32952238114aeb Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 15 Jan 2025 17:22:14 +0200 Subject: [PATCH 2/7] Change ts in test to be for sure condifent --- .../test/java/org/thingsboard/server/edge/DeviceEdgeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 11bc053bb5..e4f9080686 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -490,7 +490,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { @Test public void testSendOutdatedAttributeToCloud() throws Exception { - long ts = System.currentTimeMillis(); + long ts = System.currentTimeMillis() - 5; Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); edgeImitator.expectResponsesAmount(1); From 12233475875c6e83f1bae1c6ab7c186a1115a27e Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 20 Jan 2025 16:38:03 +0200 Subject: [PATCH 3/7] Fix testSendOutdatedAttributeToCloud --- .../test/java/org/thingsboard/server/edge/DeviceEdgeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index e4f9080686..060e641b93 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -490,7 +490,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { @Test public void testSendOutdatedAttributeToCloud() throws Exception { - long ts = System.currentTimeMillis() - 5; + long ts = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); edgeImitator.expectResponsesAmount(1); From 5f793b11f91d097187565810a63ac1856672e0b4 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 31 Jan 2025 16:06:52 +0200 Subject: [PATCH 4/7] BaseTelemetryProcessor - use async instead of .get() --- .../telemetry/BaseTelemetryProcessor.java | 161 +++++++++++------- 1 file changed, 95 insertions(+), 66 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 520c7ef5ff..5e7731db1d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -81,6 +82,7 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -262,27 +264,38 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { return new ImmutablePair<>(queueName, ruleChainId); } - private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, + TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); - filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes, json); - - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg() - .queueName(defaultQueueAndRuleChain.getKey()) - .type(TbMsgType.POST_ATTRIBUTES_REQUEST) - .originator(entityId) - .customerId(customerId) - .copyMetaData(metaData) - .data(gson.toJson(json)) - .ruleChainId(defaultQueueAndRuleChain.getValue()) - .build(); - edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + ListenableFuture> future = filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes); + Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); + public void onSuccess(List attributesToSave) { + JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg() + .queueName(defaultQueueAndRuleChain.getKey()) + .type(TbMsgType.POST_ATTRIBUTES_REQUEST) + .originator(entityId) + .customerId(customerId) + .copyMetaData(metaData) + .data(gson.toJson(jsonToSave)) + .ruleChainId(defaultQueueAndRuleChain.getValue()) + .build(); + edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't process post attributes [{}]", tenantId, msg, t); + futureToSet.setException(t); + } + }); } @Override @@ -290,7 +303,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { log.error("[{}] Can't process post attributes [{}]", tenantId, msg, t); futureToSet.setException(t); } - }); + }, dbCallbackExecutorService); return futureToSet; } @@ -299,34 +312,46 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, - long ts) throws Exception { + long ts) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope")); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); - List attributesToSave = filterAttributesByTs(tenantId, entityId, scope, attributes, json); - tsSubService.saveAttributes(AttributesSaveRequest.builder() - .tenantId(tenantId) - .entityId(entityId) - .scope(scope) - .entries(attributesToSave) - .callback(new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void tmp) { - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg() - .queueName(defaultQueueAndRuleChain.getKey()) - .type(TbMsgType.ATTRIBUTES_UPDATED) - .originator(entityId) - .customerId(customerId) - .copyMetaData(metaData) - .data(gson.toJson(json)) - .ruleChainId(defaultQueueAndRuleChain.getValue()) - .build(); - edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(List attributesToSave) { + JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave); + tsSubService.saveAttributes(AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(entityId) + .scope(scope) + .entries(attributesToSave) + .callback(new FutureCallback<>() { @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); + public void onSuccess(@Nullable Void tmp) { + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg() + .queueName(defaultQueueAndRuleChain.getKey()) + .type(TbMsgType.ATTRIBUTES_UPDATED) + .originator(entityId) + .customerId(customerId) + .copyMetaData(metaData) + .data(gson.toJson(jsonToSave)) + .ruleChainId(defaultQueueAndRuleChain.getValue()) + .build(); + edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); + futureToSet.setException(t); + } + }); } @Override @@ -334,22 +359,28 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); futureToSet.setException(t); } - }); - } + }) + .build()); + } - @Override - public void onFailure(Throwable t) { - log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); - futureToSet.setException(t); - } - }) - .build()); + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); return futureToSet; } - private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, - String entityType) { + private JsonObject filterAttributesFromJson(JsonObject json, List attributesToSave) { + Set keysToSave = attributesToSave.stream() + .map(KvEntry::getKey) + .collect(Collectors.toSet()); + json.keySet().removeIf(key -> !keysToSave.contains(key)); + return json; + } + private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { String scope = attributeDeleteMsg.getScope(); List attributeKeys = attributeDeleteMsg.getAttributeNamesList(); ListenableFuture> removeAllFuture = edgeCtx.getAttributesService().removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys); @@ -401,21 +432,19 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson)); } - private List filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes, JsonObject jsonObject) throws Exception { + private ListenableFuture> filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, + List attributes) { List keys = attributes.stream().map(KvEntry::getKey).toList(); - Map existingAttributesTs = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys).get() - .stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs)); - return attributes.stream() - .filter(attribute -> { - String key = attribute.getKey(); - long incomingTs = attribute.getLastUpdateTs(); - if (incomingTs > existingAttributesTs.getOrDefault(key, 0L)) { - return true; - } else { - jsonObject.remove(key); - return false; - } - }).toList(); + ListenableFuture> future = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys); + return Futures.transform(future, input -> { + Map existingAttributesTs = input.stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs)); + return attributes.stream() + .filter(attribute -> { + String key = attribute.getKey(); + long incomingTs = attribute.getLastUpdateTs(); + return incomingTs > existingAttributesTs.getOrDefault(key, 0L); + }).toList(); + }, dbCallbackExecutorService); } } From ce4c654e21d89784a7b893f1abe9bac56ecb1e21 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 31 Jan 2025 16:58:12 +0200 Subject: [PATCH 5/7] BaseTelemetryProcessor - use scope from msg, and not hardcoded CLIENT_SCOPE --- .../processor/telemetry/BaseTelemetryProcessor.java | 8 +++++--- .../rule/engine/edge/AbstractTbMsgPushNode.java | 10 ++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 5e7731db1d..73713ccf99 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -130,11 +130,12 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId = pair.getValue(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); if (entityData.hasPostAttributesMsg()) { + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); } if (entityData.hasAttributesUpdatedMsg()) { - metaData.putValue("scope", entityData.getPostAttributeScope()); + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); } @@ -268,8 +269,9 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); - ListenableFuture> future = filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes); + ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(List attributesToSave) { @@ -315,7 +317,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { long ts) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope")); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); Futures.addCallback(future, new FutureCallback<>() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index f1fe1affc6..911bfac43f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -54,8 +54,6 @@ public abstract class AbstractTbMsgPushNode { entityBody.put("kv", dataJson); entityBody.put("ts", msg.getMetaDataTs()); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) { entityBody.put("isPostAttributes", true); } @@ -99,7 +97,7 @@ public abstract class AbstractTbMsgPushNode keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { }); entityBody.put("keys", keys); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); } case TIMESERIES_UPDATED -> { entityBody.put("data", dataJson); @@ -146,7 +144,7 @@ public abstract class AbstractTbMsgPushNode metadata) { - String scope = metadata.get(SCOPE); + String scope = metadata.get(DataConstants.SCOPE); if (StringUtils.isEmpty(scope)) { scope = config.getScope(); } @@ -164,7 +162,7 @@ public abstract class AbstractTbMsgPushNode Date: Fri, 31 Jan 2025 17:02:07 +0200 Subject: [PATCH 6/7] BaseTelemetryProcessor - Fixed import --- .../edge/rpc/processor/telemetry/BaseTelemetryProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 73713ccf99..4251ee8a79 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; From 3b8be75a14fe87968a5823346db4f1f8afcadf1c Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 31 Jan 2025 18:24:50 +0200 Subject: [PATCH 7/7] DeviceEdgeTest.testSendOutdatedAttributeToCloud - improved stability for concurrent issue --- .../java/org/thingsboard/server/edge/DeviceEdgeTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 060e641b93..80e7d2aed4 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -500,6 +500,15 @@ public class DeviceEdgeTest extends AbstractEdgeTest { attributesNode.put("test_attr", originalValue); doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode); + // Wait before device attributes saved to database + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/attributes/" + DataConstants.SERVER_SCOPE; + List actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {}); + return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("test_attr"); + }); + JsonObject attributesData = new JsonObject(); // incorrect msg, will not be saved, because of ts is lower than for already existing String attributesKey = "test_attr";