diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 6beae6fac6..5ca394be35 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -161,7 +161,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor DefaultActorService.RULE_DISPATCHER_NAME, - () -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getName(), ruleNode.getId())); + () -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleChainName, ruleNode.getId())); } private void initRoutes(RuleChain ruleChain, List ruleNodeList) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 497a6148d4..2716e36a46 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.RuleNodeException; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; /** * @author Andrew Shvayka @@ -38,6 +39,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor> map, String prefix) { + boolean printAll = log.isTraceEnabled(); + log.info("{} to process [{}] messages", prefix, map.size()); + for (Map.Entry> pending : map.entrySet()) { + ToRuleEngineMsg tmp = pending.getValue().getValue(); + TbMsg tmpMsg = TbMsg.fromBytes(configuration.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); + RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey()); + if (printAll) { + log.trace("[{}] {} to process message: {}, Last Rule Node: {}", new TenantId(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); + } else { + log.info("[{}] {} to process message: {}, Last Rule Node: {}", new TenantId(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); + break; + } + } + } + @Override protected ServiceType getServiceType() { return ServiceType.TB_RULE_ENGINE; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index f093cc885a..c7925fd759 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -16,8 +16,10 @@ package org.thingsboard.server.service.queue; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.TbMsgCallback; import java.util.UUID; @@ -45,4 +47,10 @@ public class TbMsgPackCallback implements TbMsgCallback { log.trace("[{}] ON FAILURE", id, e); ctx.onFailure(tenantId, id, e); } + + @Override + public void visit(RuleNodeInfo ruleNodeInfo) { + log.trace("[{}] ON PROCESS: {}", id, ruleNodeInfo); + ctx.visit(id, ruleNodeInfo); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java index 7e78ac6f5f..1b7ffd9c30 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java @@ -16,8 +16,10 @@ package org.thingsboard.server.service.queue; import lombok.Getter; +import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; @@ -32,7 +34,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class TbMsgPackProcessingContext { private final TbRuleEngineSubmitStrategy submitStrategy; - private final AtomicInteger pendingCount; private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1); @Getter @@ -44,6 +45,8 @@ public class TbMsgPackProcessingContext { @Getter private final ConcurrentMap exceptionsMap = new ConcurrentHashMap<>(); + private final ConcurrentMap lastRuleNodeMap = new ConcurrentHashMap<>(); + public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) { this.submitStrategy = submitStrategy; this.pendingMap = submitStrategy.getPendingMap(); @@ -81,4 +84,13 @@ public class TbMsgPackProcessingContext { processingTimeoutLatch.countDown(); } } + + public void visit(UUID id, RuleNodeInfo ruleNodeInfo) { + lastRuleNodeMap.put(id, ruleNodeInfo); + } + + public RuleNodeInfo getLastVisitedRuleNode(UUID id) { + return lastRuleNodeMap.get(id); + } + } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 1604875c7b..43c3294e5c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -106,7 +106,6 @@ public final class TbMsg implements Serializable { if (callback != null) { this.callback = callback; } else { - log.warn("[{}] Created message with empty callback: {}", originator, type); this.callback = TbMsgCallback.EMPTY; } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java new file mode 100644 index 0000000000..0c2d38b3b2 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.queue; + +import org.thingsboard.server.common.data.id.RuleNodeId; + +public class RuleNodeInfo { + private final String label; + + public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) { + this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]"; + } + + @Override + public String toString() { + return label; + } +} \ No newline at end of file diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java index 9e8d5ae6b8..4103a4efc1 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java @@ -34,4 +34,7 @@ public interface TbMsgCallback { void onFailure(RuleEngineException e); + default void visit(RuleNodeInfo ruleNodeInfo) { + } + }