Improvement to the restarts of the rule nodes
This commit is contained in:
		
							parent
							
								
									008c2c5d5f
								
							
						
					
					
						commit
						58e31ceb78
					
				@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.plugin.RuleNodeUpdatedMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
@ -131,7 +132,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
 | 
			
		||||
                    existing.setSelf(ruleNode);
 | 
			
		||||
                    existing.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED));
 | 
			
		||||
                    existing.getSelfActor().tellWithHighPriority(new RuleNodeUpdatedMsg(tenantId, existing.getSelf().getId()));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -26,7 +26,6 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChain;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
 | 
			
		||||
@ -54,6 +53,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
 | 
			
		||||
    protected boolean doProcess(TbActorMsg msg) {
 | 
			
		||||
        switch (msg.getMsgType()) {
 | 
			
		||||
            case COMPONENT_LIFE_CYCLE_MSG:
 | 
			
		||||
            case RULE_NODE_UPDATED_MSG:
 | 
			
		||||
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
 | 
			
		||||
                break;
 | 
			
		||||
            case RULE_CHAIN_TO_RULE_MSG:
 | 
			
		||||
 | 
			
		||||
@ -20,14 +20,13 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorCtx;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorRef;
 | 
			
		||||
import org.thingsboard.server.actors.TbRuleNodeUpdateException;
 | 
			
		||||
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeException;
 | 
			
		||||
@ -78,7 +77,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
 | 
			
		||||
            if (tbNode != null) {
 | 
			
		||||
                tbNode.destroy();
 | 
			
		||||
            }
 | 
			
		||||
            start(context);
 | 
			
		||||
            try {
 | 
			
		||||
                start(context);
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                throw new TbRuleNodeUpdateException("Failed to update rule node", e);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.actors.TbActor;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorCtx;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorException;
 | 
			
		||||
import org.thingsboard.server.actors.TbRuleNodeUpdateException;
 | 
			
		||||
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
 | 
			
		||||
import org.thingsboard.server.actors.stats.StatsPersistMsg;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
@ -123,6 +124,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            logAndPersist("onLifecycleMsg", e, true);
 | 
			
		||||
            logLifecycleEvent(msg.getEvent(), e);
 | 
			
		||||
            if (e instanceof TbRuleNodeUpdateException) {
 | 
			
		||||
                throw (TbRuleNodeUpdateException) e;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,7 @@ package org.thingsboard.server.actors;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.common.msg.MsgType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorStopReason;
 | 
			
		||||
 | 
			
		||||
@ -73,7 +74,7 @@ public final class TbActorMailbox implements TbActorCtx {
 | 
			
		||||
            if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
 | 
			
		||||
                log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
 | 
			
		||||
                stopReason = TbActorStopReason.INIT_FAILED;
 | 
			
		||||
                system.stop(selfId);
 | 
			
		||||
                destroy();
 | 
			
		||||
            } else if (strategy.getRetryDelay() > 0) {
 | 
			
		||||
                log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay());
 | 
			
		||||
                log.debug("[{}] Error", selfId, t);
 | 
			
		||||
@ -95,7 +96,19 @@ public final class TbActorMailbox implements TbActorCtx {
 | 
			
		||||
            }
 | 
			
		||||
            tryProcessQueue(true);
 | 
			
		||||
        } else {
 | 
			
		||||
            msg.onTbActorStopped(stopReason);
 | 
			
		||||
            if (highPriority && msg.getMsgType().equals(MsgType.RULE_NODE_UPDATED_MSG)) {
 | 
			
		||||
                synchronized (this) {
 | 
			
		||||
                    if (stopReason == TbActorStopReason.INIT_FAILED) {
 | 
			
		||||
                        destroyInProgress.set(false);
 | 
			
		||||
                        stopReason = null;
 | 
			
		||||
                        initActor();
 | 
			
		||||
                    } else {
 | 
			
		||||
                        msg.onTbActorStopped(stopReason);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                msg.onTbActorStopped(stopReason);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -126,6 +139,9 @@ public final class TbActorMailbox implements TbActorCtx {
 | 
			
		||||
                try {
 | 
			
		||||
                    log.debug("[{}] Going to process message: {}", selfId, msg);
 | 
			
		||||
                    actor.process(msg);
 | 
			
		||||
                } catch (TbRuleNodeUpdateException updateException){
 | 
			
		||||
                    stopReason = TbActorStopReason.INIT_FAILED;
 | 
			
		||||
                    destroy();
 | 
			
		||||
                } catch (Throwable t) {
 | 
			
		||||
                    log.debug("[{}] Failed to process message: {}", selfId, msg, t);
 | 
			
		||||
                    ProcessFailureStrategy strategy = actor.onProcessFailure(t);
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,26 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2021 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.actors;
 | 
			
		||||
 | 
			
		||||
public class TbRuleNodeUpdateException extends RuntimeException {
 | 
			
		||||
 | 
			
		||||
    private static final long serialVersionUID = 8209771144711980882L;
 | 
			
		||||
 | 
			
		||||
    public TbRuleNodeUpdateException(String message, Throwable cause) {
 | 
			
		||||
        super(message, cause);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -39,6 +39,11 @@ public enum MsgType {
 | 
			
		||||
     */
 | 
			
		||||
    COMPONENT_LIFE_CYCLE_MSG,
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Special message to indicate rule node update request
 | 
			
		||||
     */
 | 
			
		||||
    RULE_NODE_UPDATED_MSG,
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Misc messages consumed from the Queue and forwarded to Rule Engine Actor.
 | 
			
		||||
     *
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,40 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2021 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.msg.plugin;
 | 
			
		||||
 | 
			
		||||
import lombok.ToString;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 | 
			
		||||
import org.thingsboard.server.common.msg.MsgType;
 | 
			
		||||
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
 */
 | 
			
		||||
@ToString
 | 
			
		||||
public class RuleNodeUpdatedMsg extends ComponentLifecycleMsg {
 | 
			
		||||
 | 
			
		||||
    public RuleNodeUpdatedMsg(TenantId tenantId, EntityId entityId) {
 | 
			
		||||
        super(tenantId, entityId, ComponentLifecycleEvent.UPDATED);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public MsgType getMsgType() {
 | 
			
		||||
        return MsgType.RULE_NODE_UPDATED_MSG;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
 | 
			
		||||
 | 
			
		||||
@ -72,41 +73,45 @@ public class CalculateDeltaNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        JsonNode json = JacksonUtil.toJsonNode(msg.getData());
 | 
			
		||||
        String inputKey = config.getInputValueKey();
 | 
			
		||||
        if (json.has(inputKey)) {
 | 
			
		||||
            DonAsynchron.withCallback(getLastValue(msg.getOriginator()),
 | 
			
		||||
                    previousData -> {
 | 
			
		||||
                        double currentValue = json.get(inputKey).asDouble();
 | 
			
		||||
                        long currentTs = TbMsgTimeseriesNode.getTs(msg);
 | 
			
		||||
        if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
 | 
			
		||||
            JsonNode json = JacksonUtil.toJsonNode(msg.getData());
 | 
			
		||||
            String inputKey = config.getInputValueKey();
 | 
			
		||||
            if (json.has(inputKey)) {
 | 
			
		||||
                DonAsynchron.withCallback(getLastValue(msg.getOriginator()),
 | 
			
		||||
                        previousData -> {
 | 
			
		||||
                            double currentValue = json.get(inputKey).asDouble();
 | 
			
		||||
                            long currentTs = TbMsgTimeseriesNode.getTs(msg);
 | 
			
		||||
 | 
			
		||||
                        if (useCache) {
 | 
			
		||||
                            cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue));
 | 
			
		||||
                        }
 | 
			
		||||
                            if (useCache) {
 | 
			
		||||
                                cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue));
 | 
			
		||||
                            }
 | 
			
		||||
 | 
			
		||||
                        BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
 | 
			
		||||
                            BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
 | 
			
		||||
 | 
			
		||||
                        if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
 | 
			
		||||
                            ctx.tellNext(msg, TbRelationTypes.FAILURE);
 | 
			
		||||
                            return;
 | 
			
		||||
                        }
 | 
			
		||||
                            if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
 | 
			
		||||
                                ctx.tellNext(msg, TbRelationTypes.FAILURE);
 | 
			
		||||
                                return;
 | 
			
		||||
                            }
 | 
			
		||||
 | 
			
		||||
                        if (config.getRound() != null) {
 | 
			
		||||
                            delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
 | 
			
		||||
                        }
 | 
			
		||||
                            if (config.getRound() != null) {
 | 
			
		||||
                                delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
 | 
			
		||||
                            }
 | 
			
		||||
 | 
			
		||||
                        ObjectNode result = (ObjectNode) json;
 | 
			
		||||
                        result.put(config.getOutputValueKey(), delta);
 | 
			
		||||
                            ObjectNode result = (ObjectNode) json;
 | 
			
		||||
                            result.put(config.getOutputValueKey(), delta);
 | 
			
		||||
 | 
			
		||||
                        if (config.isAddPeriodBetweenMsgs()) {
 | 
			
		||||
                            long period = previousData != null ? currentTs - previousData.ts : 0;
 | 
			
		||||
                            result.put(config.getPeriodValueKey(), period);
 | 
			
		||||
                        }
 | 
			
		||||
                        ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
 | 
			
		||||
                    },
 | 
			
		||||
                    t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
 | 
			
		||||
        } else if (config.isTellFailureIfInputValueKeyIsAbsent()) {
 | 
			
		||||
            ctx.tellNext(msg, TbRelationTypes.FAILURE);
 | 
			
		||||
                            if (config.isAddPeriodBetweenMsgs()) {
 | 
			
		||||
                                long period = previousData != null ? currentTs - previousData.ts : 0;
 | 
			
		||||
                                result.put(config.getPeriodValueKey(), period);
 | 
			
		||||
                            }
 | 
			
		||||
                            ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
 | 
			
		||||
                        },
 | 
			
		||||
                        t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
 | 
			
		||||
            } else if (config.isTellFailureIfInputValueKeyIsAbsent()) {
 | 
			
		||||
                ctx.tellNext(msg, TbRelationTypes.FAILURE);
 | 
			
		||||
            } else {
 | 
			
		||||
                ctx.tellSuccess(msg);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            ctx.tellSuccess(msg);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user