diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index cc0c9da8f2..830714b431 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.plugin.RuleNodeUpdatedMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -131,7 +132,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor 0 && attemptIdx > settings.getMaxActorInitAttempts())) { log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t); stopReason = TbActorStopReason.INIT_FAILED; - system.stop(selfId); + destroy(); } else if (strategy.getRetryDelay() > 0) { log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay()); log.debug("[{}] Error", selfId, t); @@ -95,7 +96,19 @@ public final class TbActorMailbox implements TbActorCtx { } tryProcessQueue(true); } else { - msg.onTbActorStopped(stopReason); + if (highPriority && msg.getMsgType().equals(MsgType.RULE_NODE_UPDATED_MSG)) { + synchronized (this) { + if (stopReason == TbActorStopReason.INIT_FAILED) { + destroyInProgress.set(false); + stopReason = null; + initActor(); + } else { + msg.onTbActorStopped(stopReason); + } + } + } else { + msg.onTbActorStopped(stopReason); + } } } @@ -126,6 +139,9 @@ public final class TbActorMailbox implements TbActorCtx { try { log.debug("[{}] Going to process message: {}", selfId, msg); actor.process(msg); + } catch (TbRuleNodeUpdateException updateException){ + stopReason = TbActorStopReason.INIT_FAILED; + destroy(); } catch (Throwable t) { log.debug("[{}] Failed to process message: {}", selfId, msg, t); ProcessFailureStrategy strategy = actor.onProcessFailure(t); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbRuleNodeUpdateException.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbRuleNodeUpdateException.java new file mode 100644 index 0000000000..7e3cca863d --- /dev/null +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbRuleNodeUpdateException.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +public class TbRuleNodeUpdateException extends RuntimeException { + + private static final long serialVersionUID = 8209771144711980882L; + + public TbRuleNodeUpdateException(String message, Throwable cause) { + super(message, cause); + } +} + diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index ffc574d5c2..13d0a6e9eb 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -39,6 +39,11 @@ public enum MsgType { */ COMPONENT_LIFE_CYCLE_MSG, + /** + * Special message to indicate rule node update request + */ + RULE_NODE_UPDATED_MSG, + /** * Misc messages consumed from the Queue and forwarded to Rule Engine Actor. * diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/RuleNodeUpdatedMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/RuleNodeUpdatedMsg.java new file mode 100644 index 0000000000..ca4ab1c949 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/RuleNodeUpdatedMsg.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.plugin; + +import lombok.ToString; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.MsgType; + +import java.util.Optional; + +/** + * @author Andrew Shvayka + */ +@ToString +public class RuleNodeUpdatedMsg extends ComponentLifecycleMsg { + + public RuleNodeUpdatedMsg(TenantId tenantId, EntityId entityId) { + super(tenantId, entityId, ComponentLifecycleEvent.UPDATED); + } + + @Override + public MsgType getMsgType() { + return MsgType.RULE_NODE_UPDATED_MSG; + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java index 0cd8a8b08a..1d5ae744c3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.mapping.JacksonUtil; @@ -72,41 +73,45 @@ public class CalculateDeltaNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - JsonNode json = JacksonUtil.toJsonNode(msg.getData()); - String inputKey = config.getInputValueKey(); - if (json.has(inputKey)) { - DonAsynchron.withCallback(getLastValue(msg.getOriginator()), - previousData -> { - double currentValue = json.get(inputKey).asDouble(); - long currentTs = TbMsgTimeseriesNode.getTs(msg); + if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { + JsonNode json = JacksonUtil.toJsonNode(msg.getData()); + String inputKey = config.getInputValueKey(); + if (json.has(inputKey)) { + DonAsynchron.withCallback(getLastValue(msg.getOriginator()), + previousData -> { + double currentValue = json.get(inputKey).asDouble(); + long currentTs = TbMsgTimeseriesNode.getTs(msg); - if (useCache) { - cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue)); - } + if (useCache) { + cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue)); + } - BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); + BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); - if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { - ctx.tellNext(msg, TbRelationTypes.FAILURE); - return; - } + if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { + ctx.tellNext(msg, TbRelationTypes.FAILURE); + return; + } - if (config.getRound() != null) { - delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); - } + if (config.getRound() != null) { + delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); + } - ObjectNode result = (ObjectNode) json; - result.put(config.getOutputValueKey(), delta); + ObjectNode result = (ObjectNode) json; + result.put(config.getOutputValueKey(), delta); - if (config.isAddPeriodBetweenMsgs()) { - long period = previousData != null ? currentTs - previousData.ts : 0; - result.put(config.getPeriodValueKey(), period); - } - ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result))); - }, - t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); - } else if (config.isTellFailureIfInputValueKeyIsAbsent()) { - ctx.tellNext(msg, TbRelationTypes.FAILURE); + if (config.isAddPeriodBetweenMsgs()) { + long period = previousData != null ? currentTs - previousData.ts : 0; + result.put(config.getPeriodValueKey(), period); + } + ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result))); + }, + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); + } else if (config.isTellFailureIfInputValueKeyIsAbsent()) { + ctx.tellNext(msg, TbRelationTypes.FAILURE); + } else { + ctx.tellSuccess(msg); + } } else { ctx.tellSuccess(msg); }