Merge pull request #12751 from YevhenBondarenko/fix/input-node-enqueue
[Input node] Use correct queue name from msg
This commit is contained in:
commit
8dcb7fda55
@ -124,6 +124,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
|
||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED;
|
||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED;
|
||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED;
|
||||
@ -178,7 +179,7 @@ public class DefaultTbContext implements TbContext {
|
||||
.resetRuleNodeId()
|
||||
.build();
|
||||
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
||||
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
|
||||
TopicPartitionInfo tpi = resolvePartition(msg);
|
||||
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
|
||||
}
|
||||
|
||||
@ -195,7 +196,7 @@ public class DefaultTbContext implements TbContext {
|
||||
|
||||
@Override
|
||||
public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg, MAIN_QUEUE_NAME);
|
||||
enqueue(tpi, tbMsg, onFailure, onSuccess);
|
||||
}
|
||||
|
||||
|
||||
@ -98,6 +98,56 @@ class DefaultTbContextTest {
|
||||
defaultTbContext = new DefaultTbContext(mainCtxMock, "Test rule chain name", nodeCtxMock);
|
||||
}
|
||||
|
||||
@MethodSource
|
||||
@ParameterizedTest
|
||||
public void givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi(String queueName) {
|
||||
// GIVEN
|
||||
var tpi = resolve(queueName);
|
||||
|
||||
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), eq(queueName), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
|
||||
var clusterService = mock(TbClusterService.class);
|
||||
given(mainCtxMock.getClusterService()).willReturn(clusterService);
|
||||
var callbackMock = mock(TbMsgCallback.class);
|
||||
given(callbackMock.isMsgValid()).willReturn(true);
|
||||
var ruleNode = new RuleNode(RULE_NODE_ID);
|
||||
|
||||
var msg = TbMsg.newMsg()
|
||||
.type(TbMsgType.POST_TELEMETRY_REQUEST)
|
||||
.originator(TENANT_ID)
|
||||
.queueName(queueName)
|
||||
.copyMetaData(TbMsgMetaData.EMPTY)
|
||||
.data(TbMsg.EMPTY_STRING)
|
||||
.callback(callbackMock)
|
||||
.build();
|
||||
|
||||
var ruleChainId = new RuleChainId(UUID.randomUUID());
|
||||
|
||||
ruleNode.setRuleChainId(RULE_CHAIN_ID);
|
||||
ruleNode.setDebugSettings(DebugSettings.failures());
|
||||
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
|
||||
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
|
||||
|
||||
// WHEN
|
||||
defaultTbContext.input(msg, ruleChainId);
|
||||
|
||||
// THEN
|
||||
|
||||
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
|
||||
}
|
||||
|
||||
private static Stream<String> givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi() {
|
||||
return Stream.of("Main", "Test", null);
|
||||
}
|
||||
|
||||
private TopicPartitionInfo resolve(String queueName) {
|
||||
var tpiBuilder = TopicPartitionInfo.builder()
|
||||
.topic(queueName == null ? "MainQueueTopic" : queueName + "QueueTopic")
|
||||
.partition(1)
|
||||
.myPartition(true);
|
||||
|
||||
return tpiBuilder.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDebugFailuresEvents_whenTellSuccess_thenVerifyDebugOutputNotPersisted() {
|
||||
// GIVEN
|
||||
@ -810,10 +860,10 @@ class DefaultTbContextTest {
|
||||
@MethodSource
|
||||
@ParameterizedTest
|
||||
void givenDebugFailuresAndDebugAllAndConnectionAndPersistedResultOptions_whenTellNext_thenVerifyDebugOutputPersistence(boolean debugFailures,
|
||||
long debugAllUntil,
|
||||
String connection,
|
||||
boolean shouldPersist,
|
||||
boolean shouldPersistAfterDurationTime) {
|
||||
long debugAllUntil,
|
||||
String connection,
|
||||
boolean shouldPersist,
|
||||
boolean shouldPersistAfterDurationTime) {
|
||||
// GIVEN
|
||||
var callbackMock = mock(TbMsgCallback.class);
|
||||
var msg = getTbMsgWithCallback(callbackMock);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user