diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 8bd3bcc81d..66afd2671c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -124,6 +124,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED; import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED; @@ -178,7 +179,7 @@ public class DefaultTbContext implements TbContext { .resetRuleNodeId() .build(); tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(msg); doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @@ -195,7 +196,7 @@ public class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, MAIN_QUEUE_NAME); enqueue(tpi, tbMsg, onFailure, onSuccess); } diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index af2f9ce8ec..ea900afd4e 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -98,6 +98,56 @@ class DefaultTbContextTest { defaultTbContext = new DefaultTbContext(mainCtxMock, "Test rule chain name", nodeCtxMock); } + @MethodSource + @ParameterizedTest + public void givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi(String queueName) { + // GIVEN + var tpi = resolve(queueName); + + given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), eq(queueName), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); + var clusterService = mock(TbClusterService.class); + given(mainCtxMock.getClusterService()).willReturn(clusterService); + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + var ruleNode = new RuleNode(RULE_NODE_ID); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .queueName(queueName) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + + var ruleChainId = new RuleChainId(UUID.randomUUID()); + + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.failures()); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + + // WHEN + defaultTbContext.input(msg, ruleChainId); + + // THEN + + then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); + } + + private static Stream givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi() { + return Stream.of("Main", "Test", null); + } + + private TopicPartitionInfo resolve(String queueName) { + var tpiBuilder = TopicPartitionInfo.builder() + .topic(queueName == null ? "MainQueueTopic" : queueName + "QueueTopic") + .partition(1) + .myPartition(true); + + return tpiBuilder.build(); + } + @Test public void givenDebugFailuresEvents_whenTellSuccess_thenVerifyDebugOutputNotPersisted() { // GIVEN @@ -810,10 +860,10 @@ class DefaultTbContextTest { @MethodSource @ParameterizedTest void givenDebugFailuresAndDebugAllAndConnectionAndPersistedResultOptions_whenTellNext_thenVerifyDebugOutputPersistence(boolean debugFailures, - long debugAllUntil, - String connection, - boolean shouldPersist, - boolean shouldPersistAfterDurationTime) { + long debugAllUntil, + String connection, + boolean shouldPersist, + boolean shouldPersistAfterDurationTime) { // GIVEN var callbackMock = mock(TbMsgCallback.class); var msg = getTbMsgWithCallback(callbackMock);