Update rule node ctx on update
This commit is contained in:
parent
5b03e45fcb
commit
ecf54464a8
@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
|||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||||
|
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||||
@ -106,6 +107,11 @@ class DefaultTbContext implements TbContext {
|
|||||||
nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor());
|
nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateSelf(RuleNode self) {
|
||||||
|
nodeCtx.setSelf(self);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RuleNodeId getSelfId() {
|
public RuleNodeId getSelfId() {
|
||||||
return nodeCtx.getSelf().getId();
|
return nodeCtx.getSelf().getId();
|
||||||
|
|||||||
@ -92,6 +92,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
|||||||
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
|
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
|
||||||
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
|
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
|
||||||
} else {
|
} else {
|
||||||
|
existing.setSelf(ruleNode);
|
||||||
existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
|
existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -153,7 +154,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
|||||||
checkActive();
|
checkActive();
|
||||||
TbMsg tbMsg = envelope.getTbMsg();
|
TbMsg tbMsg = envelope.getTbMsg();
|
||||||
//TODO: push to queue and act on ack in async way
|
//TODO: push to queue and act on ack in async way
|
||||||
pushMstToNode(firstNode, tbMsg);
|
pushMsgToNode(firstNode, tbMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
|
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
|
||||||
@ -175,7 +176,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
|||||||
case RULE_NODE:
|
case RULE_NODE:
|
||||||
RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
|
RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
|
||||||
RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
|
RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
|
||||||
pushMstToNode(targetRuleNode, msg);
|
pushMsgToNode(targetRuleNode, msg);
|
||||||
break;
|
break;
|
||||||
case RULE_CHAIN:
|
case RULE_CHAIN:
|
||||||
// TODO: implement
|
// TODO: implement
|
||||||
@ -185,7 +186,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
|
private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
|
||||||
if (nodeCtx != null) {
|
if (nodeCtx != null) {
|
||||||
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
|
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -68,6 +68,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
|
|||||||
boolean restartRequired = !(ruleNode.getType().equals(newRuleNode.getType())
|
boolean restartRequired = !(ruleNode.getType().equals(newRuleNode.getType())
|
||||||
&& ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
|
&& ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
|
||||||
this.ruleNode = newRuleNode;
|
this.ruleNode = newRuleNode;
|
||||||
|
this.defaultCtx.updateSelf(newRuleNode);
|
||||||
if (restartRequired) {
|
if (restartRequired) {
|
||||||
if (tbNode != null) {
|
if (tbNode != null) {
|
||||||
tbNode.destroy();
|
tbNode.destroy();
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.actors.ruleChain;
|
package org.thingsboard.server.actors.ruleChain;
|
||||||
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||||
@ -24,9 +25,10 @@ import org.thingsboard.server.common.data.rule.RuleNode;
|
|||||||
* Created by ashvayka on 19.03.18.
|
* Created by ashvayka on 19.03.18.
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
final class RuleNodeCtx {
|
final class RuleNodeCtx {
|
||||||
private final TenantId tenantId;
|
private final TenantId tenantId;
|
||||||
private final ActorRef chainActor;
|
private final ActorRef chainActor;
|
||||||
private final ActorRef selfActor;
|
private final ActorRef selfActor;
|
||||||
private final RuleNode self;
|
private RuleNode self;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,12 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.api;
|
package org.thingsboard.rule.engine.api;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
|
||||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||||
@ -31,14 +27,10 @@ import org.thingsboard.server.dao.device.DeviceService;
|
|||||||
import org.thingsboard.server.dao.plugin.PluginService;
|
import org.thingsboard.server.dao.plugin.PluginService;
|
||||||
import org.thingsboard.server.dao.relation.RelationService;
|
import org.thingsboard.server.dao.relation.RelationService;
|
||||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||||
import org.thingsboard.server.dao.rule.RuleService;
|
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.dao.user.UserService;
|
import org.thingsboard.server.dao.user.UserService;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ashvayka on 13.01.18.
|
* Created by ashvayka on 13.01.18.
|
||||||
@ -63,6 +55,8 @@ public interface TbContext {
|
|||||||
|
|
||||||
void tellError(TbMsg msg, Throwable th);
|
void tellError(TbMsg msg, Throwable th);
|
||||||
|
|
||||||
|
void updateSelf(RuleNode self);
|
||||||
|
|
||||||
RuleNodeId getSelfId();
|
RuleNodeId getSelfId();
|
||||||
|
|
||||||
AttributesService getAttributesService();
|
AttributesService getAttributesService();
|
||||||
|
|||||||
@ -34,8 +34,8 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
|
|||||||
nodeDescription = "Filter incoming messages using JS script",
|
nodeDescription = "Filter incoming messages using JS script",
|
||||||
nodeDetails = "Evaluate incoming Message with configured JS condition. " +
|
nodeDetails = "Evaluate incoming Message with configured JS condition. " +
|
||||||
"If <b>True</b> - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used." +
|
"If <b>True</b> - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used." +
|
||||||
"Message payload can be accessed via <code>msg</code> property. For example <code>msg.temperature < 10;</code>" +
|
"Message payload can be accessed via <code>msg</code> property. For example <code>msg.temperature < 10;</code><br/>" +
|
||||||
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code>" +
|
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code><br/>" +
|
||||||
"Message type can be accessed via <code>msgType</code> property.",
|
"Message type can be accessed via <code>msgType</code> property.",
|
||||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||||
configDirective = "tbFilterNodeScriptConfig")
|
configDirective = "tbFilterNodeScriptConfig")
|
||||||
|
|||||||
@ -35,8 +35,8 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
|
|||||||
nodeDescription = "Route incoming Message to one or multiple output chains",
|
nodeDescription = "Route incoming Message to one or multiple output chains",
|
||||||
nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " +
|
nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " +
|
||||||
"If Array is empty - message not routed to next Node. " +
|
"If Array is empty - message not routed to next Node. " +
|
||||||
"Message payload can be accessed via <code>msg</code> property. For example <code>msg.temperature < 10;</code> " +
|
"Message payload can be accessed via <code>msg</code> property. For example <code>msg.temperature < 10;</code><br/>" +
|
||||||
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code>" +
|
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code><br/>" +
|
||||||
"Message type can be accessed via <code>msgType</code> property.",
|
"Message type can be accessed via <code>msgType</code> property.",
|
||||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||||
configDirective = "tbFilterNodeSwitchConfig")
|
configDirective = "tbFilterNodeSwitchConfig")
|
||||||
|
|||||||
@ -34,7 +34,7 @@ import javax.script.Bindings;
|
|||||||
"<code>msg</code> - is a Message payload.<br/>" +
|
"<code>msg</code> - is a Message payload.<br/>" +
|
||||||
"<code>msgType</code> - is a Message type.<br/>" +
|
"<code>msgType</code> - is a Message type.<br/>" +
|
||||||
"Should return the following structure:<br/>" +
|
"Should return the following structure:<br/>" +
|
||||||
"<code>{ msg: <new payload>, metadata: <new metadata>, msgType: <new msgType> }</code>" +
|
"<code>{ msg: <i style=\"color: #666;\">new payload</i>,<br/>   metadata: <i style=\"color: #666;\">new metadata</i>,<br/>   msgType: <i style=\"color: #666;\">new msgType</i> }</code><br/>" +
|
||||||
"All fields in resulting object are optional and will be taken from original message if not specified.",
|
"All fields in resulting object are optional and will be taken from original message if not specified.",
|
||||||
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
|
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
|
||||||
configDirective = "tbTransformationNodeScriptConfig")
|
configDirective = "tbTransformationNodeScriptConfig")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user