used correct queueName for TPI in input node
This commit is contained in:
		
							parent
							
								
									fbb6700a6c
								
							
						
					
					
						commit
						347e18ec4b
					
				@ -124,6 +124,7 @@ import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED;
 | 
			
		||||
@ -178,7 +179,7 @@ public class DefaultTbContext implements TbContext {
 | 
			
		||||
                .resetRuleNodeId()
 | 
			
		||||
                .build();
 | 
			
		||||
        tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
 | 
			
		||||
        TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
 | 
			
		||||
        TopicPartitionInfo tpi = resolvePartition(msg);
 | 
			
		||||
        doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -195,7 +196,7 @@ public class DefaultTbContext implements TbContext {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer<Throwable> onFailure) {
 | 
			
		||||
        TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
 | 
			
		||||
        TopicPartitionInfo tpi = resolvePartition(tbMsg, MAIN_QUEUE_NAME);
 | 
			
		||||
        enqueue(tpi, tbMsg, onFailure, onSuccess);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -98,6 +98,56 @@ class DefaultTbContextTest {
 | 
			
		||||
        defaultTbContext = new DefaultTbContext(mainCtxMock, "Test rule chain name", nodeCtxMock);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    public void givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi(String queueName) {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var tpi = resolve(queueName);
 | 
			
		||||
 | 
			
		||||
        given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), eq(queueName), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi);
 | 
			
		||||
        var clusterService = mock(TbClusterService.class);
 | 
			
		||||
        given(mainCtxMock.getClusterService()).willReturn(clusterService);
 | 
			
		||||
        var callbackMock = mock(TbMsgCallback.class);
 | 
			
		||||
        given(callbackMock.isMsgValid()).willReturn(true);
 | 
			
		||||
        var ruleNode = new RuleNode(RULE_NODE_ID);
 | 
			
		||||
 | 
			
		||||
        var msg = TbMsg.newMsg()
 | 
			
		||||
                .type(TbMsgType.POST_TELEMETRY_REQUEST)
 | 
			
		||||
                .originator(TENANT_ID)
 | 
			
		||||
                .queueName(queueName)
 | 
			
		||||
                .copyMetaData(TbMsgMetaData.EMPTY)
 | 
			
		||||
                .data(TbMsg.EMPTY_STRING)
 | 
			
		||||
                .callback(callbackMock)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        var ruleChainId = new RuleChainId(UUID.randomUUID());
 | 
			
		||||
 | 
			
		||||
        ruleNode.setRuleChainId(RULE_CHAIN_ID);
 | 
			
		||||
        ruleNode.setDebugSettings(DebugSettings.failures());
 | 
			
		||||
        given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
 | 
			
		||||
        given(nodeCtxMock.getSelf()).willReturn(ruleNode);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        defaultTbContext.input(msg, ruleChainId);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
 | 
			
		||||
        then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<String> givenMsgWithQueueName_whenInput_thenVerifyEnqueueWithCorrectTpi() {
 | 
			
		||||
        return Stream.of("Main", "Test", null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TopicPartitionInfo resolve(String queueName) {
 | 
			
		||||
        var tpiBuilder = TopicPartitionInfo.builder()
 | 
			
		||||
                .topic(queueName == null ? "MainQueueTopic" : queueName + "QueueTopic")
 | 
			
		||||
                .partition(1)
 | 
			
		||||
                .myPartition(true);
 | 
			
		||||
 | 
			
		||||
        return tpiBuilder.build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenDebugFailuresEvents_whenTellSuccess_thenVerifyDebugOutputNotPersisted() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user