From 2ea2b57f3c4debfbb1cff09dec4c3ec25b3b1d39 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 8 Sep 2020 11:06:57 +0300 Subject: [PATCH] Top 5 rule nodes statistics (cherry picked from commit 445350b1f800b90193f50ee38556e5187acff9e7) --- .../actors/ruleChain/DefaultTbContext.java | 2 + .../RuleNodeActorMessageProcessor.java | 2 +- .../DefaultTbRuleEngineConsumerService.java | 4 +- .../service/queue/TbMsgPackCallback.java | 12 ++- .../queue/TbMsgPackProcessingContext.java | 60 ++++++++++++- .../service/queue/TbMsgProfilerInfo.java | 85 +++++++++++++++++++ .../service/queue/TbRuleNodeProfilerInfo.java | 75 ++++++++++++++++ .../BatchTbRuleEngineSubmitStrategy.java | 4 +- .../queue/TbMsgPackProcessingContextTest.java | 2 +- .../server/common/msg/queue/RuleNodeInfo.java | 4 + .../common/msg/queue/TbMsgCallback.java | 8 +- 11 files changed, 247 insertions(+), 11 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/TbMsgProfilerInfo.java create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index e89daf4516..30747e8f1b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -105,6 +105,7 @@ class DefaultTbContext implements TbContext { if (nodeCtx.getSelf().isDebugMode()) { relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); } + msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); } @@ -214,6 +215,7 @@ class DefaultTbContext implements TbContext { if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null); } + tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); tbMsg.getCallback().onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 7583ea553e..db55ff8edf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -103,7 +103,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor submitExecutor.submit(() -> { log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); @@ -194,6 +194,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (!ctx.getFailedMap().isEmpty()) { printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); } + ctx.printProfilerStats(); + TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); if (statsEnabled) { stats.log(result, decision.isCommit()); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index c7925fd759..5712f29eac 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -49,8 +49,14 @@ public class TbMsgPackCallback implements TbMsgCallback { } @Override - public void visit(RuleNodeInfo ruleNodeInfo) { - log.trace("[{}] ON PROCESS: {}", id, ruleNodeInfo); - ctx.visit(id, ruleNodeInfo); + public void onProcessingStart(RuleNodeInfo ruleNodeInfo) { + log.trace("[{}] ON PROCESSING START: {}", id, ruleNodeInfo); + ctx.onProcessingStart(id, ruleNodeInfo); + } + + @Override + public void onProcessingEnd(RuleNodeId ruleNodeId) { + log.trace("[{}] ON PROCESSING END: {}", id, ruleNodeId); + ctx.onProcessingEnd(id, ruleNodeId); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java index 1b7ffd9c30..c85544ce81 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.queue; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -24,6 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; +import java.util.Comparator; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,9 +34,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +@Slf4j public class TbMsgPackProcessingContext { + private final String queueName; private final TbRuleEngineSubmitStrategy submitStrategy; + @Getter + private final boolean profilerEnabled; private final AtomicInteger pendingCount; private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1); @Getter @@ -47,14 +54,20 @@ public class TbMsgPackProcessingContext { private final ConcurrentMap lastRuleNodeMap = new ConcurrentHashMap<>(); - public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) { + public TbMsgPackProcessingContext(String queueName, TbRuleEngineSubmitStrategy submitStrategy) { + this.queueName = queueName; this.submitStrategy = submitStrategy; + this.profilerEnabled = log.isDebugEnabled(); this.pendingMap = submitStrategy.getPendingMap(); this.pendingCount = new AtomicInteger(pendingMap.size()); } public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException { - return processingTimeoutLatch.await(packProcessingTimeout, milliseconds); + boolean success = processingTimeoutLatch.await(packProcessingTimeout, milliseconds); + if (!success && profilerEnabled) { + msgProfilerMap.values().forEach(TbMsgProfilerInfo::onTimeout); + } + return success; } public void onSuccess(UUID id) { @@ -85,12 +98,53 @@ public class TbMsgPackProcessingContext { } } - public void visit(UUID id, RuleNodeInfo ruleNodeInfo) { + private final ConcurrentHashMap msgProfilerMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap ruleNodeProfilerMap = new ConcurrentHashMap<>(); + + public void onProcessingStart(UUID id, RuleNodeInfo ruleNodeInfo) { lastRuleNodeMap.put(id, ruleNodeInfo); + if (profilerEnabled) { + msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onStart(ruleNodeInfo.getRuleNodeId()); + ruleNodeProfilerMap.putIfAbsent(ruleNodeInfo.getRuleNodeId().getId(), new TbRuleNodeProfilerInfo(ruleNodeInfo)); + } + } + + public void onProcessingEnd(UUID id, RuleNodeId ruleNodeId) { + if (profilerEnabled) { + long processingTime = msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onEnd(ruleNodeId); + if (processingTime > 0) { + ruleNodeProfilerMap.computeIfAbsent(ruleNodeId.getId(), TbRuleNodeProfilerInfo::new).record(processingTime); + } + } + } + + public void onTimeout(TbMsgProfilerInfo profilerInfo) { + Map.Entry ruleNodeInfo = profilerInfo.onTimeout(); + if (ruleNodeInfo != null) { + ruleNodeProfilerMap.computeIfAbsent(ruleNodeInfo.getKey(), TbRuleNodeProfilerInfo::new).record(ruleNodeInfo.getValue()); + } } public RuleNodeInfo getLastVisitedRuleNode(UUID id) { return lastRuleNodeMap.get(id); } + public void printProfilerStats() { + if (profilerEnabled) { + log.debug("Top Rule Nodes by max execution time:"); + ruleNodeProfilerMap.values().stream() + .sorted(Comparator.comparingLong(TbRuleNodeProfilerInfo::getMaxExecutionTime).reversed()).limit(5) + .forEach(info -> log.debug("[{}][{}] max execution time: {}. {}", queueName, info.getRuleNodeId(), info.getMaxExecutionTime(), info.getLabel())); + + log.info("Top Rule Nodes by avg execution time:"); + ruleNodeProfilerMap.values().stream() + .sorted(Comparator.comparingDouble(TbRuleNodeProfilerInfo::getAvgExecutionTime).reversed()).limit(5) + .forEach(info -> log.info("[{}][{}] avg execution time: {}. {}", queueName, info.getRuleNodeId(), info.getAvgExecutionTime(), info.getLabel())); + + log.info("Top Rule Nodes by execution count:"); + ruleNodeProfilerMap.values().stream() + .sorted(Comparator.comparingInt(TbRuleNodeProfilerInfo::getExecutionCount).reversed()).limit(5) + .forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel())); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgProfilerInfo.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgProfilerInfo.java new file mode 100644 index 0000000000..f66cd1a50a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgProfilerInfo.java @@ -0,0 +1,85 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public class TbMsgProfilerInfo { + private final UUID msgId; + private AtomicLong totalProcessingTime = new AtomicLong(); + private Lock stateLock = new ReentrantLock(); + private RuleNodeId currentRuleNodeId; + private long stateChangeTime; + + public TbMsgProfilerInfo(UUID msgId) { + this.msgId = msgId; + } + + public void onStart(RuleNodeId ruleNodeId) { + long currentTime = System.currentTimeMillis(); + stateLock.lock(); + try { + currentRuleNodeId = ruleNodeId; + stateChangeTime = currentTime; + } finally { + stateLock.unlock(); + } + } + + public long onEnd(RuleNodeId ruleNodeId) { + long currentTime = System.currentTimeMillis(); + stateLock.lock(); + try { + if (ruleNodeId.equals(currentRuleNodeId)) { + long processingTime = currentTime - stateChangeTime; + stateChangeTime = currentTime; + totalProcessingTime.addAndGet(processingTime); + currentRuleNodeId = null; + return processingTime; + } else { + log.trace("[{}] Invalid sequence of rule node processing detected. Expected [{}] but was [{}]", msgId, currentRuleNodeId, ruleNodeId); + return 0; + } + } finally { + stateLock.unlock(); + } + } + + public Map.Entry onTimeout() { + long currentTime = System.currentTimeMillis(); + stateLock.lock(); + try { + if (currentRuleNodeId != null && stateChangeTime > 0) { + long timeoutTime = currentTime - stateChangeTime; + totalProcessingTime.addAndGet(timeoutTime); + return new AbstractMap.SimpleEntry<>(currentRuleNodeId.getId(), timeoutTime); + } + } finally { + stateLock.unlock(); + } + return null; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java new file mode 100644 index 0000000000..c88532fbc3 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java @@ -0,0 +1,75 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.Getter; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class TbRuleNodeProfilerInfo { + @Getter + private final UUID ruleNodeId; + @Getter + private final String label; + private AtomicInteger executionCount = new AtomicInteger(0); + private AtomicLong executionTime = new AtomicLong(0); + private AtomicLong maxExecutionTime = new AtomicLong(0); + + public TbRuleNodeProfilerInfo(RuleNodeInfo ruleNodeInfo) { + this.ruleNodeId = ruleNodeInfo.getRuleNodeId().getId(); + this.label = ruleNodeInfo.toString(); + } + + public TbRuleNodeProfilerInfo(UUID ruleNodeId) { + this.ruleNodeId = ruleNodeId; + this.label = ""; + } + + public void record(long processingTime) { + executionCount.incrementAndGet(); + executionTime.addAndGet(processingTime); + while (true) { + long value = maxExecutionTime.get(); + if (value >= processingTime) { + break; + } + if (maxExecutionTime.compareAndSet(value, processingTime)) { + break; + } + } + } + + int getExecutionCount() { + return executionCount.get(); + } + + long getMaxExecutionTime() { + return maxExecutionTime.get(); + } + + double getAvgExecutionTime() { + double executionCnt = (double) executionCount.get(); + if (executionCnt > 0) { + return executionTime.get() / executionCnt; + } else { + return 0.0; + } + } + +} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java index b9741d2433..be299b39b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java @@ -68,18 +68,20 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS int listSize = orderedMsgList.size(); int startIdx = Math.min(packIdx.get() * batchSize, listSize); int endIdx = Math.min(startIdx + batchSize, listSize); + Map> tmpPack; synchronized (pendingPack) { pendingPack.clear(); for (int i = startIdx; i < endIdx; i++) { IdMsgPair pair = orderedMsgList.get(i); pendingPack.put(pair.uuid, pair.msg); } + tmpPack = new LinkedHashMap<>(pendingPack); } int submitSize = pendingPack.size(); if (log.isDebugEnabled() && submitSize > 0) { log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); } - pendingPack.forEach(msgConsumer); + tmpPack.forEach(msgConsumer); } } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java index 43cd62ea97..595f64b3a4 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java @@ -51,7 +51,7 @@ public class TbMsgPackProcessingContextTest { messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null)); } when(strategyMock.getPendingMap()).thenReturn(messages); - TbMsgPackProcessingContext context = new TbMsgPackProcessingContext(strategyMock); + TbMsgPackProcessingContext context = new TbMsgPackProcessingContext("Main", strategyMock); for (UUID uuid : messages.keySet()) { for (int i = 0; i < parallelCount; i++) { executorService.submit(() -> context.onSuccess(uuid)); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java index 0c2d38b3b2..9af3f20363 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java @@ -15,12 +15,16 @@ */ package org.thingsboard.server.common.msg.queue; +import lombok.Getter; import org.thingsboard.server.common.data.id.RuleNodeId; public class RuleNodeInfo { private final String label; + @Getter + private final RuleNodeId ruleNodeId; public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) { + this.ruleNodeId = id; this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]"; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java index 4103a4efc1..3f6927adb1 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.common.msg.queue; +import org.thingsboard.server.common.data.id.RuleNodeId; + public interface TbMsgCallback { TbMsgCallback EMPTY = new TbMsgCallback() { @@ -34,7 +36,11 @@ public interface TbMsgCallback { void onFailure(RuleEngineException e); - default void visit(RuleNodeInfo ruleNodeInfo) { + default void onProcessingStart(RuleNodeInfo ruleNodeInfo) { } + default void onProcessingEnd(RuleNodeId ruleNodeId) { + } + + }