From 8059274002d401cf9335752dc24454177b4e050b Mon Sep 17 00:00:00 2001 From: Bohdan Smetaniuk Date: Tue, 29 Sep 2020 15:24:18 +0300 Subject: [PATCH 1/4] fixed attributes saving --- .../service/edge/rpc/processor/TelemetryProcessor.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index d5b31cb18c..fe3fd208f6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; @@ -36,9 +37,11 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKey; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.gen.edge.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.EntityDataProto; @@ -136,7 +139,9 @@ public class TelemetryProcessor extends BaseProcessor { private ListenableFuture processPostAttributes(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json)); + Set attributes = JsonConverter.convertToAttributes(json); + attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json)); tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { From 64e60de9bc8ce8376ed99c702e97314d2e9248c7 Mon Sep 17 00:00:00 2001 From: Bohdan Smetaniuk Date: Tue, 29 Sep 2020 17:10:09 +0300 Subject: [PATCH 2/4] processing future + attributes node refactoring --- .../rpc/processor/TelemetryProcessor.java | 27 ++++++++++++++----- .../engine/telemetry/TbMsgAttributesNode.java | 4 +-- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index fe3fd208f6..cc2cb15d4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -15,12 +15,14 @@ */ package org.thingsboard.server.service.edge.rpc.processor; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; @@ -140,12 +142,23 @@ public class TelemetryProcessor extends BaseProcessor { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Set attributes = JsonConverter.convertToAttributes(json); - attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json)); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + ListenableFuture> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); + public void onSuccess(@Nullable List voids) { + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json)); + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process post attributes [{}]", msg, t); + futureToSet.setException(t); + } + }); } @Override @@ -153,7 +166,7 @@ public class TelemetryProcessor extends BaseProcessor { log.error("Can't process post attributes [{}]", msg, t); futureToSet.setException(t); } - }); + }, dbCallbackExecutorService); return futureToSet; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index f0f5d2418f..5fd06108e4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -63,9 +63,7 @@ public class TbMsgAttributesNode implements TbNode { } String src = msg.getData(); Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)); - if (StringUtils.isEmpty(msg.getMetaData().getValue(SCOPE))) { - msg.getMetaData().putValue(SCOPE, config.getScope()); - } + msg.getMetaData().putValue(SCOPE, config.getScope()); ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); } From 20c794884a5d1c0a2fb8290610e192e84994fc63 Mon Sep 17 00:00:00 2001 From: Bohdan Smetaniuk Date: Thu, 1 Oct 2020 13:55:30 +0300 Subject: [PATCH 3/4] attributes updated/post attributes separation --- .../constructor/EntityDataMsgConstructor.java | 9 +++- .../rpc/processor/TelemetryProcessor.java | 30 +++++++++-- common/edge-api/src/main/proto/edge.proto | 5 +- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 50 +++++++++---------- 4 files changed, 61 insertions(+), 33 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java index 0f7fb0523d..5bbea5351f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.edge.rpc.constructor; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; -import com.google.gson.JsonNull; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -27,6 +26,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.gen.edge.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.EntityDataProto; +import org.thingsboard.server.gen.transport.TransportProtos; import java.util.List; @@ -57,7 +57,12 @@ public class EntityDataMsgConstructor { case ATTRIBUTES_UPDATED: try { JsonObject data = entityData.getAsJsonObject(); - builder.setPostAttributesMsg(JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv"))); + TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")); + if (data.has("isPostAttributes") && data.getAsJsonPrimitive("isPostAttributes").getAsBoolean()) { + builder.setPostAttributesMsg(postAttributeMsg); + } else { + builder.setAttributesUpdateMsg(postAttributeMsg); + } builder.setPostAttributeScope(data.getAsJsonPrimitive("scope").getAsString()); } catch (Exception e) { log.warn("Can't convert to attributes proto, entityData [{}]", entityData, e); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index cc2cb15d4b..da5dc3746f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -68,13 +68,16 @@ public class TelemetryProcessor extends BaseProcessor { public List> onTelemetryUpdate(TenantId tenantId, EntityDataProto entityData) { List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); - if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg()) && entityId != null) { + if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdateMsg()) && entityId != null) { TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { - metaData.putValue("scope", entityData.getPostAttributeScope()); result.add(processPostAttributes(tenantId, entityId, entityData.getPostAttributesMsg(), metaData)); } + if (entityData.hasAttributesUpdateMsg()) { + metaData.putValue("scope", entityData.getPostAttributeScope()); + result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdateMsg(), metaData)); + } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, entityId, entityData.getPostTelemetryMsg(), metaData)); } @@ -139,6 +142,25 @@ public class TelemetryProcessor extends BaseProcessor { } private ListenableFuture processPostAttributes(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + SettableFuture futureToSet = SettableFuture.create(); + JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json)); + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process post attributes [{}]", msg, t); + futureToSet.setException(t); + } + }); + return futureToSet; + } + + private ListenableFuture processAttributesUpdate(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Set attributes = JsonConverter.convertToAttributes(json); @@ -155,7 +177,7 @@ public class TelemetryProcessor extends BaseProcessor { @Override public void onFailure(Throwable t) { - log.error("Can't process post attributes [{}]", msg, t); + log.error("Can't process attributes update [{}]", msg, t); futureToSet.setException(t); } }); @@ -163,7 +185,7 @@ public class TelemetryProcessor extends BaseProcessor { @Override public void onFailure(Throwable t) { - log.error("Can't process post attributes [{}]", msg, t); + log.error("Can't process attributes update [{}]", msg, t); futureToSet.setException(t); } }, dbCallbackExecutorService); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index c2ce170401..e5aa03cfa2 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -102,8 +102,9 @@ message EntityDataProto { string entityType = 3; transport.PostTelemetryMsg postTelemetryMsg = 4; transport.PostAttributeMsg postAttributesMsg = 5; - string postAttributeScope = 6; - AttributeDeleteMsg attributeDeleteMsg = 7; + transport.PostAttributeMsg attributesUpdateMsg = 6; + string postAttributeScope = 7; + AttributeDeleteMsg attributeDeleteMsg = 8; // transport.ToDeviceRpcRequestMsg ??? } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index 8d00507a7d..b510af10b0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -138,16 +138,37 @@ public class TbMsgPushToEdgeNode implements TbNode { } private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) throws JsonProcessingException { - if (DataConstants.ALARM.equals(msg.getType())) { + String msgType = msg.getType(); + if (DataConstants.ALARM.equals(msgType)) { return buildEdgeEvent(ctx.getTenantId(), ActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null); } else { EdgeEventType edgeEventTypeByEntityType = EdgeUtils.getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType()); if (edgeEventTypeByEntityType == null) { return null; } - ActionType actionType = getActionTypeByMsgType(msg.getType()); - JsonNode entityBody = getEntityBody(actionType, msg.getData(), msg.getMetaData().getData()); - return buildEdgeEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), edgeEventTypeByEntityType, entityBody); + ActionType actionType = getActionTypeByMsgType(msgType); + Map entityBody = new HashMap<>(); + Map metadata = msg.getMetaData().getData(); + JsonNode dataJson = json.readTree(msg.getData()); + switch (actionType) { + case ATTRIBUTES_UPDATED: + entityBody.put("kv", dataJson); + entityBody.put("scope", metadata.get("scope")); + if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) { + entityBody.put("isPostAttributes", true); + } + break; + case ATTRIBUTES_DELETED: + List keys = json.treeToValue(dataJson.get("attributes"), List.class); + entityBody.put("keys", keys); + entityBody.put("scope", metadata.get("scope")); + break; + case TIMESERIES_UPDATED: + entityBody.put("data", dataJson); + entityBody.put("ts", metadata.get("ts")); + break; + } + return buildEdgeEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), edgeEventTypeByEntityType, json.valueToTree(entityBody)); } } @@ -161,27 +182,6 @@ public class TbMsgPushToEdgeNode implements TbNode { return edgeEvent; } - private JsonNode getEntityBody(ActionType actionType, String data, Map metadata) throws JsonProcessingException { - Map entityBody = new HashMap<>(); - JsonNode dataJson = json.readTree(data); - switch (actionType) { - case ATTRIBUTES_UPDATED: - entityBody.put("kv", dataJson); - entityBody.put("scope", metadata.get("scope")); - break; - case ATTRIBUTES_DELETED: - List keys = json.treeToValue(dataJson.get("attributes"), List.class); - entityBody.put("keys", keys); - entityBody.put("scope", metadata.get("scope")); - break; - case TIMESERIES_UPDATED: - entityBody.put("data", dataJson); - entityBody.put("ts", metadata.get("ts")); - break; - } - return json.valueToTree(entityBody); - } - private UUID getUUIDFromMsgData(TbMsg msg) throws JsonProcessingException { JsonNode data = json.readTree(msg.getData()).get("id"); String id = json.treeToValue(data.get("id"), String.class); From ed4d5a69ddd7be07199a79511c0e4415232437bf Mon Sep 17 00:00:00 2001 From: Bohdan Smetaniuk Date: Thu, 1 Oct 2020 16:00:06 +0300 Subject: [PATCH 4/4] fixes --- .../edge/rpc/constructor/EntityDataMsgConstructor.java | 2 +- .../service/edge/rpc/processor/TelemetryProcessor.java | 8 ++++---- common/edge-api/src/main/proto/edge.proto | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java index 5bbea5351f..ffa2ada070 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java @@ -61,7 +61,7 @@ public class EntityDataMsgConstructor { if (data.has("isPostAttributes") && data.getAsJsonPrimitive("isPostAttributes").getAsBoolean()) { builder.setPostAttributesMsg(postAttributeMsg); } else { - builder.setAttributesUpdateMsg(postAttributeMsg); + builder.setAttributesUpdatedMsg(postAttributeMsg); } builder.setPostAttributeScope(data.getAsJsonPrimitive("scope").getAsString()); } catch (Exception e) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index da5dc3746f..2fdd5614aa 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; @@ -52,6 +51,7 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -68,15 +68,15 @@ public class TelemetryProcessor extends BaseProcessor { public List> onTelemetryUpdate(TenantId tenantId, EntityDataProto entityData) { List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); - if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdateMsg()) && entityId != null) { + if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { result.add(processPostAttributes(tenantId, entityId, entityData.getPostAttributesMsg(), metaData)); } - if (entityData.hasAttributesUpdateMsg()) { + if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdateMsg(), metaData)); + result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, entityId, entityData.getPostTelemetryMsg(), metaData)); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index e5aa03cfa2..fd26e1fcd4 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -102,7 +102,7 @@ message EntityDataProto { string entityType = 3; transport.PostTelemetryMsg postTelemetryMsg = 4; transport.PostAttributeMsg postAttributesMsg = 5; - transport.PostAttributeMsg attributesUpdateMsg = 6; + transport.PostAttributeMsg attributesUpdatedMsg = 6; string postAttributeScope = 7; AttributeDeleteMsg attributeDeleteMsg = 8; // transport.ToDeviceRpcRequestMsg ???