diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index cd8521027a..a1e8434b6e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -124,7 +124,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; -import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED; import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED; @@ -196,7 +195,7 @@ public class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - enqueue(tbMsg, MAIN_QUEUE_NAME, onSuccess, onFailure); + enqueue(tbMsg, tbMsg.getQueueName(), onSuccess, onFailure); } @Override diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java index ea900afd4e..54205327ca 100644 --- a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -131,11 +131,87 @@ class DefaultTbContextTest { defaultTbContext.input(msg, ruleChainId); // THEN + then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); + } + @MethodSource + @ParameterizedTest + public void givenMsgWithQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi(String queueName) { + // GIVEN + var tpi = resolve(queueName); + + given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), eq(queueName), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); + var clusterService = mock(TbClusterService.class); + given(mainCtxMock.getClusterService()).willReturn(clusterService); + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + var ruleNode = new RuleNode(RULE_NODE_ID); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .queueName(queueName) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.failures()); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + + // WHEN + defaultTbContext.enqueue(msg, () -> {}, t -> {}); + + // THEN + then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); + } + + @MethodSource + @ParameterizedTest + public void givenMsgAndQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi(String queueName) { + // GIVEN + var tpi = resolve(queueName); + + given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), eq(queueName), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); + var clusterService = mock(TbClusterService.class); + given(mainCtxMock.getClusterService()).willReturn(clusterService); + var callbackMock = mock(TbMsgCallback.class); + given(callbackMock.isMsgValid()).willReturn(true); + var ruleNode = new RuleNode(RULE_NODE_ID); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(TENANT_ID) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .callback(callbackMock) + .build(); + + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugSettings(DebugSettings.failures()); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + + // WHEN + defaultTbContext.enqueue(msg, queueName, () -> {}, t -> {}); + + // THEN then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); } private static Stream givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi() { + return testQueueNames(); + } + + private static Stream givenMsgWithQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi() { + return testQueueNames(); + } + + private static Stream givenMsgAndQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi() { + return testQueueNames(); + } + + private static Stream testQueueNames() { return Stream.of("Main", "Test", null); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index d743867b84..46b63e0595 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -143,8 +143,7 @@ public interface TbContext { void tellFailure(TbMsg msg, Throwable th); /** - * Puts new message to queue for processing by the Root Rule Chain - * WARNING: message is put to the Main queue. To specify other queue name - use {@link #enqueue(TbMsg, String, Runnable, Consumer)} + * Puts new message to queue from TbMsg for processing by the Root Rule Chain * * @param msg - message */