used queueName from TbMsg fro enqueue

This commit is contained in:
YevhenBondarenko 2025-02-26 12:56:07 +01:00
parent b9e8fae8ab
commit 45db251817
3 changed files with 78 additions and 4 deletions

View File

@ -124,7 +124,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED;
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED;
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED;
@ -196,7 +195,7 @@ public class DefaultTbContext implements TbContext {
@Override
public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer<Throwable> onFailure) {
enqueue(tbMsg, MAIN_QUEUE_NAME, onSuccess, onFailure);
enqueue(tbMsg, tbMsg.getQueueName(), onSuccess, onFailure);
}
@Override

View File

@ -131,11 +131,87 @@ class DefaultTbContextTest {
defaultTbContext.input(msg, ruleChainId);
// THEN
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
}
@MethodSource
@ParameterizedTest
public void givenMsgWithQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi(String queueName) {
// GIVEN
var tpi = resolve(queueName);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), eq(queueName), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);
given(mainCtxMock.getClusterService()).willReturn(clusterService);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.queueName(queueName)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.failures());
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
// WHEN
defaultTbContext.enqueue(msg, () -> {}, t -> {});
// THEN
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
}
@MethodSource
@ParameterizedTest
public void givenMsgAndQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi(String queueName) {
// GIVEN
var tpi = resolve(queueName);
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), eq(queueName), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
var clusterService = mock(TbClusterService.class);
given(mainCtxMock.getClusterService()).willReturn(clusterService);
var callbackMock = mock(TbMsgCallback.class);
given(callbackMock.isMsgValid()).willReturn(true);
var ruleNode = new RuleNode(RULE_NODE_ID);
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(TENANT_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.callback(callbackMock)
.build();
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugSettings(DebugSettings.failures());
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
// WHEN
defaultTbContext.enqueue(msg, queueName, () -> {}, t -> {});
// THEN
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
}
private static Stream<String> givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi() {
return testQueueNames();
}
private static Stream<String> givenMsgWithQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi() {
return testQueueNames();
}
private static Stream<String> givenMsgAndQueueName_whenEnqueue_thenVerifyEnqueueWithCorrectTpi() {
return testQueueNames();
}
private static Stream<String> testQueueNames() {
return Stream.of("Main", "Test", null);
}

View File

@ -143,8 +143,7 @@ public interface TbContext {
void tellFailure(TbMsg msg, Throwable th);
/**
* Puts new message to queue for processing by the Root Rule Chain
* WARNING: message is put to the Main queue. To specify other queue name - use {@link #enqueue(TbMsg, String, Runnable, Consumer)}
* Puts new message to queue from TbMsg for processing by the Root Rule Chain
*
* @param msg - message
*/