diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java index 1c9ea9343c..f108db0056 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java @@ -20,7 +20,6 @@ import lombok.Data; import org.thingsboard.server.common.data.BaseData; import org.thingsboard.server.common.data.id.EdgeEventId; import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import java.util.UUID; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index ad757220b3..97b0c1c8a0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.edge; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -77,33 +78,35 @@ public class TbMsgPushToEdgeNode implements TbNode { if (isSupportedOriginator(msg.getOriginator().getEntityType())) { if (isSupportedMsgType(msg.getType())) { ListenableFuture getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator()); - Futures.transform(getEdgeIdFuture, edgeId -> { - EdgeEventType edgeEventTypeByEntityType = ctx.getEdgeEventService().getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType()); - if (edgeEventTypeByEntityType == null) { - log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); + Futures.addCallback(getEdgeIdFuture, new FutureCallback() { + @Override + public void onSuccess(@org.checkerframework.checker.nullness.qual.Nullable EdgeId edgeId) { + EdgeEventType edgeEventTypeByEntityType = ctx.getEdgeEventService().getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType()); + if (edgeEventTypeByEntityType == null) { + log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); + ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); + } + EdgeEvent edgeEvent = null; + try { + edgeEvent = buildEdgeEvent(ctx, msg, edgeId, edgeEventTypeByEntityType); + } catch (JsonProcessingException e) { + log.error("Failed to build edge event", e); + } + ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); + Futures.addCallback(saveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable EdgeEvent event) { + ctx.tellNext(msg, SUCCESS); + } @Override + public void onFailure(Throwable th) { + log.error("Could not save edge event", th); + ctx.tellFailure(msg, th); + } + }, ctx.getDbCallbackExecutor()); + } @Override + public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); } - EdgeEvent edgeEvent = new EdgeEvent(); - edgeEvent.setTenantId(ctx.getTenantId()); - edgeEvent.setEdgeId(edgeId); - edgeEvent.setEdgeEventAction(getActionTypeByMsgType(msg.getType()).name()); - edgeEvent.setEntityId(msg.getOriginator().getId()); - edgeEvent.setEdgeEventType(edgeEventTypeByEntityType); - edgeEvent.setEntityBody(json.valueToTree(msg.getData())); - ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent event) { - ctx.tellNext(msg, SUCCESS); - } - - @Override - public void onFailure(Throwable th) { - log.error("Could not save edge event", th); - ctx.tellFailure(msg, th); - } - }, ctx.getDbCallbackExecutor()); - return null; }, ctx.getDbCallbackExecutor()); } else { log.debug("Unsupported msg type {}", msg.getType()); @@ -115,6 +118,17 @@ public class TbMsgPushToEdgeNode implements TbNode { } } + private EdgeEvent buildEdgeEvent(TbContext ctx, TbMsg msg, EdgeId edgeId, EdgeEventType edgeEventTypeByEntityType) throws JsonProcessingException { + EdgeEvent edgeEvent = new EdgeEvent(); + edgeEvent.setTenantId(ctx.getTenantId()); + edgeEvent.setEdgeId(edgeId); + edgeEvent.setEdgeEventAction(getActionTypeByMsgType(msg.getType()).name()); + edgeEvent.setEntityId(msg.getOriginator().getId()); + edgeEvent.setEdgeEventType(edgeEventTypeByEntityType); + edgeEvent.setEntityBody(json.readTree(msg.getData())); + return edgeEvent; + } + private ActionType getActionTypeByMsgType(String msgType) { ActionType actionType; if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) {