Improved logging of failed and timeout messages
This commit is contained in:
		
							parent
							
								
									8808c268ae
								
							
						
					
					
						commit
						af1f6229bb
					
				@ -161,7 +161,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 | 
			
		||||
    private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) {
 | 
			
		||||
        return ctx.getOrCreateChildActor(new TbEntityActorId(ruleNode.getId()),
 | 
			
		||||
                () -> DefaultActorService.RULE_DISPATCHER_NAME,
 | 
			
		||||
                () -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getName(), ruleNode.getId()));
 | 
			
		||||
                () -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleChainName, ruleNode.getId()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
 | 
			
		||||
 | 
			
		||||
@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeException;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
@ -38,6 +39,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
 | 
			
		||||
    private RuleNode ruleNode;
 | 
			
		||||
    private TbNode tbNode;
 | 
			
		||||
    private DefaultTbContext defaultCtx;
 | 
			
		||||
    private RuleNodeInfo info;
 | 
			
		||||
 | 
			
		||||
    RuleNodeActorMessageProcessor(TenantId tenantId, String ruleChainName, RuleNodeId ruleNodeId, ActorSystemContext systemContext
 | 
			
		||||
            , TbActorRef parent, TbActorRef self) {
 | 
			
		||||
@ -46,6 +48,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
 | 
			
		||||
        this.self = self;
 | 
			
		||||
        this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
 | 
			
		||||
        this.defaultCtx = new DefaultTbContext(systemContext, new RuleNodeCtx(tenantId, parent, self, ruleNode));
 | 
			
		||||
        this.info = new RuleNodeInfo(ruleNodeId, ruleChainName, ruleNode != null ? ruleNode.getName() : "Unknown");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -99,6 +102,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
 | 
			
		||||
        msg.getMsg().getCallback().visit(info);
 | 
			
		||||
        checkActive(msg.getMsg());
 | 
			
		||||
        if (ruleNode.isDebugMode()) {
 | 
			
		||||
            systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
 | 
			
		||||
 | 
			
		||||
@ -22,11 +22,13 @@ import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RpcError;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceQueue;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TbCallback;
 | 
			
		||||
@ -59,6 +61,7 @@ import javax.annotation.PreDestroy;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -185,6 +188,12 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
                        TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
 | 
			
		||||
                        if (timeout) {
 | 
			
		||||
                            printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");
 | 
			
		||||
                        }
 | 
			
		||||
                        if (!ctx.getFailedMap().isEmpty()) {
 | 
			
		||||
                            printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");
 | 
			
		||||
                        }
 | 
			
		||||
                        TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
 | 
			
		||||
                        if (statsEnabled) {
 | 
			
		||||
                            stats.log(result, decision.isCommit());
 | 
			
		||||
@ -212,6 +221,22 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void printFirstOrAll(TbRuleEngineQueueConfiguration configuration, TbMsgPackProcessingContext ctx, Map<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> map, String prefix) {
 | 
			
		||||
        boolean printAll = log.isTraceEnabled();
 | 
			
		||||
        log.info("{} to process [{}] messages", prefix, map.size());
 | 
			
		||||
        for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) {
 | 
			
		||||
            ToRuleEngineMsg tmp = pending.getValue().getValue();
 | 
			
		||||
            TbMsg tmpMsg = TbMsg.fromBytes(configuration.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
 | 
			
		||||
            RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
 | 
			
		||||
            if (printAll) {
 | 
			
		||||
                log.trace("[{}] {} to process message: {}, Last Rule Node: {}", new TenantId(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.info("[{}] {} to process message: {}, Last Rule Node: {}", new TenantId(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ServiceType getServiceType() {
 | 
			
		||||
        return ServiceType.TB_RULE_ENGINE;
 | 
			
		||||
 | 
			
		||||
@ -16,8 +16,10 @@
 | 
			
		||||
package org.thingsboard.server.service.queue;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -45,4 +47,10 @@ public class TbMsgPackCallback implements TbMsgCallback {
 | 
			
		||||
        log.trace("[{}] ON FAILURE", id, e);
 | 
			
		||||
        ctx.onFailure(tenantId, id, e);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void visit(RuleNodeInfo ruleNodeInfo) {
 | 
			
		||||
        log.trace("[{}] ON PROCESS: {}", id, ruleNodeInfo);
 | 
			
		||||
        ctx.visit(id, ruleNodeInfo);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -16,8 +16,10 @@
 | 
			
		||||
package org.thingsboard.server.service.queue;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
 | 
			
		||||
@ -32,7 +34,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
public class TbMsgPackProcessingContext {
 | 
			
		||||
 | 
			
		||||
    private final TbRuleEngineSubmitStrategy submitStrategy;
 | 
			
		||||
 | 
			
		||||
    private final AtomicInteger pendingCount;
 | 
			
		||||
    private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
 | 
			
		||||
    @Getter
 | 
			
		||||
@ -44,6 +45,8 @@ public class TbMsgPackProcessingContext {
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final ConcurrentMap<TenantId, RuleEngineException> exceptionsMap = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<UUID, RuleNodeInfo> lastRuleNodeMap = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) {
 | 
			
		||||
        this.submitStrategy = submitStrategy;
 | 
			
		||||
        this.pendingMap = submitStrategy.getPendingMap();
 | 
			
		||||
@ -81,4 +84,13 @@ public class TbMsgPackProcessingContext {
 | 
			
		||||
            processingTimeoutLatch.countDown();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void visit(UUID id, RuleNodeInfo ruleNodeInfo) {
 | 
			
		||||
        lastRuleNodeMap.put(id, ruleNodeInfo);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public RuleNodeInfo getLastVisitedRuleNode(UUID id) {
 | 
			
		||||
        return lastRuleNodeMap.get(id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -106,7 +106,6 @@ public final class TbMsg implements Serializable {
 | 
			
		||||
        if (callback != null) {
 | 
			
		||||
            this.callback = callback;
 | 
			
		||||
        } else {
 | 
			
		||||
            log.warn("[{}] Created message with empty callback: {}", originator, type);
 | 
			
		||||
            this.callback = TbMsgCallback.EMPTY;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,31 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.msg.queue;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
 | 
			
		||||
public class RuleNodeInfo {
 | 
			
		||||
    private final String label;
 | 
			
		||||
 | 
			
		||||
    public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) {
 | 
			
		||||
        this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String toString() {
 | 
			
		||||
        return label;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -34,4 +34,7 @@ public interface TbMsgCallback {
 | 
			
		||||
 | 
			
		||||
    void onFailure(RuleEngineException e);
 | 
			
		||||
 | 
			
		||||
    default void visit(RuleNodeInfo ruleNodeInfo) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user