diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index bf61cf7a59..0d1a65d899 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -185,7 +185,7 @@ class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueue(tpi, tbMsg, onFailure, onSuccess); } @@ -310,7 +310,7 @@ class DefaultTbContext implements TbContext { @Override public boolean isLocalEntity(EntityId entityId) { - return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), entityId).isMyPartition(); + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), entityId).isMyPartition(); } private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) { @@ -509,6 +509,11 @@ class DefaultTbContext implements TbContext { return ruleChainName; } + @Override + public String getQueueName() { + return getSelf().getQueueName(); + } + @Override public TenantId getTenantId() { return nodeCtx.getTenantId(); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 40ed2c378a..241e20115f 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -255,6 +255,8 @@ public interface TbContext { String getRuleChainName(); + String getQueueName(); + TenantId getTenantId(); AttributesService getAttributesService(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 80170a28f1..cec7aca0a3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -83,7 +83,7 @@ public class TbMsgGeneratorNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); this.currentMsgCount = 0; - this.queueName = ctx.getSelf().getQueueName(); + this.queueName = ctx.getQueueName(); if (!StringUtils.isEmpty(config.getOriginatorId())) { originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); ctx.checkTenantEntity(originatorId); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index 8f818583e8..17b72cd27a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -81,7 +81,7 @@ public class TbMsgDeduplicationNode implements TbNode { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class); this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval()); - this.queueName = ctx.getSelf().getQueueName(); + this.queueName = ctx.getQueueName(); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java index 8fe2de58b2..9616d8a167 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java @@ -50,7 +50,7 @@ public class TbCheckpointNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { - this.queueName = ctx.getSelf().getQueueName(); + this.queueName = ctx.getQueueName(); } @Override diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index ac3f2acfd5..8e8340c713 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -94,7 +94,6 @@ public class TbMsgDeduplicationNodeTest { when(ctx.getSelfId()).thenReturn(ruleNodeId); when(ctx.getTenantId()).thenReturn(tenantId); - when(ctx.getSelf()).thenReturn(new RuleNode()); doAnswer((Answer) invocationOnMock -> { TbMsgType type = (TbMsgType) (invocationOnMock.getArguments())[1]; @@ -240,10 +239,10 @@ public class TbMsgDeduplicationNodeTest { awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation); + when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME); config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -280,10 +279,10 @@ public class TbMsgDeduplicationNodeTest { awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); + when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME); config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -431,10 +430,4 @@ public class TbMsgDeduplicationNodeTest { return JacksonUtil.toString(mergedData); } - private void setQueueName(String queueName) { - RuleNode ruleNode = new RuleNode(); - ruleNode.setQueueName(queueName); - when(ctx.getSelf()).thenReturn(ruleNode); - } - }