diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 90a9d56e0c..3b89c09367 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -57,6 +57,7 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import scala.concurrent.duration.Duration; import java.util.Collections; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -102,6 +103,12 @@ class DefaultTbContext implements TbContext { scheduleMsgWithDelay(new RuleNodeToSelfMsg(msg), delayMs, nodeCtx.getSelfActor()); } + @Override + public boolean isLocalEntity(EntityId entityId) { + Optional address = mainCtx.getRoutingService().resolveById(entityId); + return !address.isPresent(); + } + private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) { mainCtx.getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, mainCtx.getActorSystem().dispatcher(), nodeCtx.getSelfActor()); } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 98f07f8383..fc8ff3dc0a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -83,7 +83,9 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor> getPartitionsFuture(TenantId tenantId, ReadTsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 9ebc610e1b..427244753b 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -51,6 +51,8 @@ public interface TbContext { void tellSelf(TbMsg msg, long delayMs); + boolean isLocalEntity(EntityId entityId); + void tellFailure(TbMsg msg, Throwable th); void updateSelf(RuleNode self); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java index b8f21fbbe9..f086a192b8 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.api; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import java.util.concurrent.ExecutionException; @@ -30,4 +31,6 @@ public interface TbNode { void destroy(); + default void onClusterEventMsg(TbContext ctx, ClusterEventMsg msg) {} + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index c15e66f9e5..0f1dd6d953 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -56,6 +57,7 @@ public class TbMsgGeneratorNode implements TbNode { private EntityId originatorId; private UUID nextTickId; private TbMsg prevMsg; + private volatile boolean initialized; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -66,21 +68,42 @@ public class TbMsgGeneratorNode implements TbNode { } else { originatorId = ctx.getSelfId(); } - this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType"); - scheduleTickMsg(ctx); + updateGeneratorState(ctx); + } + + @Override + public void onClusterEventMsg(TbContext ctx, ClusterEventMsg msg) { + updateGeneratorState(ctx); + } + + private void updateGeneratorState(TbContext ctx) { + if (ctx.isLocalEntity(originatorId)) { + if (!initialized) { + initialized = true; + this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType"); + scheduleTickMsg(ctx); + } + } else if (initialized) { + initialized = false; + destroy(); + } } @Override public void onMsg(TbContext ctx, TbMsg msg) { - if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { + if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { withCallback(generate(ctx), m -> { - ctx.tellNext(m, SUCCESS); - scheduleTickMsg(ctx); + if (initialized) { + ctx.tellNext(m, SUCCESS); + scheduleTickMsg(ctx); + } }, t -> { - ctx.tellFailure(msg, t); - scheduleTickMsg(ctx); + if (initialized) { + ctx.tellFailure(msg, t); + scheduleTickMsg(ctx); + } }); } } @@ -102,8 +125,10 @@ public class TbMsgGeneratorNode implements TbNode { if (prevMsg == null) { prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}"); } - TbMsg generated = jsEngine.executeGenerate(prevMsg); - prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData()); + if (initialized) { + TbMsg generated = jsEngine.executeGenerate(prevMsg); + prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData()); + } return prevMsg; }); } @@ -113,6 +138,7 @@ public class TbMsgGeneratorNode implements TbNode { prevMsg = null; if (jsEngine != null) { jsEngine.destroy(); + jsEngine = null; } } }