diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index f8ac564e42..7178b94dfa 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleNode; @@ -175,6 +176,9 @@ class DefaultTbContext implements TbContext { } private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg tbMsg, Set relationTypes, Runnable onSuccess, Consumer onFailure) { + RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); + RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); + tbMsg = TbMsg.newMsg(tbMsg, ruleChainId, ruleNodeId); TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 00d9d85b1c..3ab67bf81f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -34,18 +34,18 @@ import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; -import org.thingsboard.server.dao.rule.RuleChainService; -import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; -import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper; -import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper; import java.util.ArrayList; import java.util.Collections; @@ -194,25 +194,29 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor relationTypes) { try { checkActive(); EntityId entityId = msg.getOriginator(); TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); - RuleNodeId originatorNodeId = envelope.getOriginator(); List relations = nodeRoutes.get(originatorNodeId).stream() - .filter(r -> contains(envelope.getRelationTypes(), r.getType())) + .filter(r -> contains(relationTypes, r.getType())) .collect(Collectors.toList()); int relationsCount = relations.size(); if (relationsCount == 0) { log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); - if (envelope.getRelationTypes().contains(TbRelationTypes.FAILURE)) { - log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, envelope.getOriginator().getId()); + if (relationTypes.contains(TbRelationTypes.FAILURE)) { + log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId()); //TODO 2.5: Introduce our own RuleEngineFailureException to track what is wrong - msg.getCallback().onFailure(new RuntimeException("Failure during message processing by Rule Node [" + envelope.getOriginator().getId().toString() + "]")); + msg.getCallback().onFailure(new RuntimeException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]")); } else { msg.getCallback().onSuccess(); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 7d567a31f1..7230804773 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -73,6 +73,10 @@ public final class TbMsg implements Serializable { data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback()); } + public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + return new TbMsg(UUID.randomUUID(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); + } + private TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) { this(id, type, originator, metaData, dataType, data, new TbMsgTransactionData(id, originator), ruleChainId, ruleNodeId, callback);