From d3d20aa449b0ff7103a55bda971e66e0bc9a7d9f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 18 Oct 2024 16:45:11 +0200 Subject: [PATCH 1/4] used enqueue for input node for avoid latency --- .../server/actors/ruleChain/DefaultTbContext.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ec10402821..b5b389da78 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -172,8 +172,18 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { + ack(msg); + if (!msg.isValid()) { + return; + } msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg)); + TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); + TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(inputMsg)).build(); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, null); } @Override From e2a356703f5c4b7e8909d182f210fb102cd1d246 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 18 Oct 2024 17:05:22 +0200 Subject: [PATCH 2/4] minor refactoring, ack used if push is success --- .../actors/ruleChain/DefaultTbContext.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index b5b389da78..23741ad965 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -109,6 +109,8 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -172,18 +174,27 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { - ack(msg); if (!msg.isValid()) { return; } - msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); + inputMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(inputMsg)).build(); TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, null); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + ack(msg); + } + + @Override + public void onFailure(Throwable error) { + tellFailure(msg, error); + } + }); } @Override From 726c95c661516115171bbf5e1a18e3fc05d60dce Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Sun, 20 Oct 2024 13:41:56 +0200 Subject: [PATCH 3/4] fixed tests --- .../flow/AbstractRuleEngineFlowIntegrationTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 9e36fed677..920ffdd211 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.mockito.Mockito.spy; @@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleChain finalRuleChain = rootRuleChain; RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); + Awaitility.await().atMost(3, TimeUnit.SECONDS) + .until(() -> + getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000) + .getData() + .stream() + .filter(filterByPostTelemetryEventType()) + .count() == 2 + ); + eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList()); From b0a5a97bab21ebf90ce04c0c9093024a0c15cd5c Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 6 Nov 2024 15:24:02 +0100 Subject: [PATCH 4/4] minor refactoring due to comments --- .../actors/ruleChain/DefaultTbContext.java | 37 +++++++------------ ...AbstractRuleEngineFlowIntegrationTest.java | 2 +- 2 files changed, 14 insertions(+), 25 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 23741ad965..5b4902afa5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -110,7 +110,6 @@ import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -177,24 +176,10 @@ class DefaultTbContext implements TbContext { if (!msg.isValid()) { return; } - TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); - inputMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(inputMsg)).build(); - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - ack(msg); - } - - @Override - public void onFailure(Throwable error) { - tellFailure(msg, error); - } - }); + TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); + tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @Override @@ -230,14 +215,10 @@ class DefaultTbContext implements TbContext { } return; } - TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain"); } - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback( + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback( metadata -> { if (onSuccess != null) { onSuccess.run(); @@ -252,6 +233,14 @@ class DefaultTbContext implements TbContext { })); } + private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) { + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); + } + @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { TopicPartitionInfo tpi = resolvePartition(tbMsg); diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 920ffdd211..143c6c72a1 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -333,7 +333,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleChain finalRuleChain = rootRuleChain; RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); - Awaitility.await().atMost(3, TimeUnit.SECONDS) + Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS) .until(() -> getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000) .getData()