diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java index 7a2b0e43a4..5704b123cf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rule; import java.util.*; import com.fasterxml.jackson.core.JsonProcessingException; +import org.springframework.util.StringUtils; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper; import org.thingsboard.server.actors.shared.ComponentMsgProcessor; @@ -113,8 +114,9 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { } private void initAction() throws Exception { - JsonNode actionMd = ruleMd.getAction(); - action = initComponent(actionMd); + if (ruleMd.getAction() != null && !ruleMd.getAction().isNull()) { + action = initComponent(ruleMd.getAction()); + } } private void initProcessor() throws Exception { @@ -131,9 +133,11 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { } private void fetchPluginInfo() { - PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken()); - pluginTenantId = pluginMd.getTenantId(); - pluginId = pluginMd.getId(); + if (!StringUtils.isEmpty(ruleMd.getPluginToken())) { + PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken()); + pluginTenantId = pluginMd.getTenantId(); + pluginId = pluginMd.getId(); + } } protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException { @@ -162,25 +166,27 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { inMsgMd = new RuleProcessingMetaData(); } logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg); - Optional> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); - if (ruleToPluginMsgOptional.isPresent()) { - RuleToPluginMsg ruleToPluginMsg = ruleToPluginMsgOptional.get(); - logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg); - context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); - if (action.isOneWayAction()) { - pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); - } else { - pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); - scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); + if (action != null) { + Optional> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); + if (ruleToPluginMsgOptional.isPresent()) { + RuleToPluginMsg ruleToPluginMsg = ruleToPluginMsgOptional.get(); + logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg); + context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); + if (action.isOneWayAction()) { + pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); + return; + } else { + pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); + scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); + return; + } } - } else { - logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); - pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_REQUEST_FROM_ACTIONS); - return; } + logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); + pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); } - public void onPluginMsg(ActorContext context, PluginToRuleMsg msg) { + void onPluginMsg(ActorContext context, PluginToRuleMsg msg) { RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid()); if (pendingMsg != null) { ChainProcessingContext ctx = pendingMsg.getCtx(); @@ -196,7 +202,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { } } - public void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { + void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId()); if (pendingMsg != null) { logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg); @@ -210,13 +216,13 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { ctx = ctx.withError(error); } if (ctx.isFailure()) { - logger.debug("[{}] Forwarding processing chain to device actor due to failure.", ctx.getInMsg().getDeviceId()); + logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); } else if (!ctx.hasNext()) { - logger.debug("[{}] Forwarding processing chain to device actor due to end of chain.", ctx.getInMsg().getDeviceId()); + logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); } else { - logger.debug("[{}] Forwarding processing chain to next rule actor.", ctx.getInMsg().getDeviceId()); + logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ChainProcessingContext nextTask = ctx.getNext(); nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self()); } @@ -269,18 +275,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { public void onActivate(ActorContext context) throws Exception { logger.info("[{}] Going to process onActivate rule.", entityId); this.state = ComponentLifecycleState.ACTIVE; - if (action != null) { - if (filters != null) { - filters.forEach(f -> f.resume()); - } else { - initFilters(); - } + if (filters != null) { + filters.forEach(RuleLifecycleComponent::resume); if (processor != null) { processor.resume(); } else { initProcessor(); } - action.resume(); + if (action != null) { + action.resume(); + } logger.info("[{}] Rule resumed.", entityId); } else { start(); diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java index db61b8155e..1e00a6de1b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java @@ -72,16 +72,19 @@ public abstract class RuleManager { } public Optional update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) { - RuleMetaData rule = null; + RuleMetaData rule; if (event != ComponentLifecycleEvent.DELETED) { rule = systemContext.getRuleService().findRuleById(ruleId); - } - if (rule == null) { + } else { rule = ruleMap.keySet().stream() .filter(r -> r.getId().equals(ruleId)) .peek(r -> r.setState(ComponentLifecycleState.SUSPENDED)) .findFirst() .orElse(null); + if (rule != null) { + ruleMap.remove(rule); + ruleActors.remove(ruleId); + } } if (rule != null) { RuleActorMetaData actorMd = ruleMap.get(rule); diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 93ed50535c..7169894532 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -25,7 +25,7 @@ - + diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b133f03439..7e2a7735ff 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -176,7 +176,7 @@ actors: statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" - persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:60000}" + persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}" # Cache parameters cache: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java index 1451d2c1a6..0ed44d94de 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.rule; import com.fasterxml.jackson.databind.JsonNode; import lombok.Data; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.SearchTextBased; import org.thingsboard.server.common.data.id.RuleId; @@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; @Data +@EqualsAndHashCode(callSuper = true) public class RuleMetaData extends SearchTextBased implements HasName { private static final long serialVersionUID = -5656679015122935465L; diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java index b356c90e24..77d38b9ee1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java @@ -91,7 +91,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic if (rule.getProcessor() != null && !rule.getProcessor().isNull()) { validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR); } - validateComponentJson(rule.getAction(), ComponentType.ACTION); + if (rule.getAction() != null && !rule.getAction().isNull()) { + validateComponentJson(rule.getAction(), ComponentType.ACTION); + } validateRuleAndPluginState(rule); return ruleDao.save(rule); } @@ -129,6 +131,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic } private void validateRuleAndPluginState(RuleMetaData rule) { + if (org.springframework.util.StringUtils.isEmpty(rule.getPluginToken())) { + return; + } PluginMetaData pluginMd = pluginService.findPluginByApiToken(rule.getPluginToken()); if (pluginMd == null) { throw new IncorrectParameterException("Rule points to non-existent plugin!"); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java index 3dec45e482..e88247a2c7 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java @@ -125,10 +125,10 @@ public class AlarmProcessor implements RuleProcessor - + diff --git a/ui/src/app/rule/rule.directive.js b/ui/src/app/rule/rule.directive.js index bfd6b4c695..64502f9345 100644 --- a/ui/src/app/rule/rule.directive.js +++ b/ui/src/app/rule/rule.directive.js @@ -85,10 +85,11 @@ export default function RuleDirective($compile, $templateCache, $mdDialog, $docu if (scope.rule) { var valid = scope.rule.filters && scope.rule.filters.length > 0; scope.theForm.$setValidity('filters', valid); - valid = angular.isDefined(scope.rule.pluginToken) && scope.rule.pluginToken != null; - scope.theForm.$setValidity('plugin', valid); - valid = angular.isDefined(scope.rule.action) && scope.rule.action != null; - scope.theForm.$setValidity('action', valid); + var processorDefined = angular.isDefined(scope.rule.processor) && scope.rule.processor != null; + var pluginDefined = angular.isDefined(scope.rule.pluginToken) && scope.rule.pluginToken != null; + var pluginActionDefined = angular.isDefined(scope.rule.action) && scope.rule.action != null; + valid = processorDefined && !pluginDefined || (pluginDefined && pluginActionDefined); + scope.theForm.$setValidity('processorOrPlugin', valid); } }; @@ -160,6 +161,7 @@ export default function RuleDirective($compile, $templateCache, $mdDialog, $docu scope.$watch('rule.processor', function(newVal, prevVal) { if (scope.rule && scope.isEdit && !angular.equals(newVal, prevVal)) { scope.theForm.$setDirty(); + scope.updateValidity(); } }, true);