From ce4c654e21d89784a7b893f1abe9bac56ecb1e21 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 31 Jan 2025 16:58:12 +0200 Subject: [PATCH] BaseTelemetryProcessor - use scope from msg, and not hardcoded CLIENT_SCOPE --- .../processor/telemetry/BaseTelemetryProcessor.java | 8 +++++--- .../rule/engine/edge/AbstractTbMsgPushNode.java | 10 ++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 5e7731db1d..73713ccf99 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -130,11 +130,12 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId = pair.getValue(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); if (entityData.hasPostAttributesMsg()) { + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); } if (entityData.hasAttributesUpdatedMsg()) { - metaData.putValue("scope", entityData.getPostAttributeScope()); + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); } @@ -268,8 +269,9 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); - ListenableFuture> future = filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes); + ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(List attributesToSave) { @@ -315,7 +317,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { long ts) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope")); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); Futures.addCallback(future, new FutureCallback<>() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index f1fe1affc6..911bfac43f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -54,8 +54,6 @@ public abstract class AbstractTbMsgPushNode { entityBody.put("kv", dataJson); entityBody.put("ts", msg.getMetaDataTs()); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) { entityBody.put("isPostAttributes", true); } @@ -99,7 +97,7 @@ public abstract class AbstractTbMsgPushNode keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { }); entityBody.put("keys", keys); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); } case TIMESERIES_UPDATED -> { entityBody.put("data", dataJson); @@ -146,7 +144,7 @@ public abstract class AbstractTbMsgPushNode metadata) { - String scope = metadata.get(SCOPE); + String scope = metadata.get(DataConstants.SCOPE); if (StringUtils.isEmpty(scope)) { scope = config.getScope(); } @@ -164,7 +162,7 @@ public abstract class AbstractTbMsgPushNode