diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ec10402821..5b4902afa5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -109,6 +109,7 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -172,8 +173,13 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { - msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg)); + if (!msg.isValid()) { + return; + } + TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); + tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @Override @@ -209,14 +215,10 @@ class DefaultTbContext implements TbContext { } return; } - TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain"); } - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback( + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback( metadata -> { if (onSuccess != null) { onSuccess.run(); @@ -231,6 +233,14 @@ class DefaultTbContext implements TbContext { })); } + private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) { + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); + } + @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { TopicPartitionInfo tpi = resolvePartition(tbMsg); diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 9e36fed677..143c6c72a1 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.mockito.Mockito.spy; @@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleChain finalRuleChain = rootRuleChain; RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); + Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> + getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000) + .getData() + .stream() + .filter(filterByPostTelemetryEventType()) + .count() == 2 + ); + eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList());