From 89419c6999cc7b960d6f40fbb6e9b785a81c38dd Mon Sep 17 00:00:00 2001 From: vzikratyi-tb <65224127+vzikratyi-tb@users.noreply.github.com> Date: Mon, 6 Jul 2020 14:47:36 +0300 Subject: [PATCH] Prometheus Metrics (#3052) * Moved resetting ruleEngineStats to Consumer * Moved counters to Map in TbCoreConsumerStats * Added actuator and MetricsService * Added metrics to core and rule_engine consumers * Replaced summary with counters * Removed most setters and getters from TbRuleEngine stats * Added stats to Transport consumer * Added JsInvoke actuator stats * Removed MetricsService --- application/pom.xml | 12 ++ .../server/actors/ActorSystemContext.java | 15 ++- .../actors/ruleChain/DefaultTbContext.java | 6 +- .../server/service/metrics/StubCounter.java | 33 +++++ .../queue/DefaultTbCoreConsumerService.java | 18 +-- .../DefaultTbRuleEngineConsumerService.java | 9 +- .../service/queue/TbCoreConsumerStats.java | 114 +++++++++++++----- .../queue/TbRuleEngineConsumerStats.java | 91 ++++++++------ .../service/stats/DefaultJsInvokeStats.java | 81 +++++++++++++ .../service/stats/DefaultQueueStats.java | 45 +++++++ .../DefaultRuleEngineStatisticsService.java | 1 - .../server/service/stats/StatsCounter.java | 54 +++++++++ .../service/stats/StatsCounterFactory.java | 48 ++++++++ .../server/service/stats/StatsType.java | 30 +++++ .../transport/TbCoreTransportApiService.java | 18 ++- .../src/main/resources/thingsboard.yml | 10 ++ .../server/actors/JsInvokeStats.java | 44 +++++++ .../DefaultTbQueueResponseTemplate.java | 10 +- .../server/queue/stats/QueueStats.java | 37 ++++++ pom.xml | 16 +++ 20 files changed, 597 insertions(+), 95 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java create mode 100644 application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java create mode 100644 application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java create mode 100644 application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java create mode 100644 application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java create mode 100644 application/src/main/java/org/thingsboard/server/service/stats/StatsType.java create mode 100644 common/actor/src/main/java/org/thingsboard/server/actors/JsInvokeStats.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java diff --git a/application/pom.xml b/application/pom.xml index a20aeaf485..5c61fe0f4a 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -298,6 +298,18 @@ com.github.ua-parser uap-java + + org.springframework.boot + spring-boot-starter-actuator + + + io.micrometer + micrometer-core + + + io.micrometer + micrometer-registry-prometheus + diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index de368fd05a..34a689de58 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -219,6 +219,10 @@ public class ActorSystemContext { @Getter private ClaimDevicesService claimDevicesService; + @Autowired + @Getter + private JsInvokeStats jsInvokeStats; + //TODO: separate context for TbCore and TbRuleEngine @Autowired(required = false) @Getter @@ -272,19 +276,14 @@ public class ActorSystemContext { @Getter private long statisticsPersistFrequency; - @Getter - private final AtomicInteger jsInvokeRequestsCount = new AtomicInteger(0); - @Getter - private final AtomicInteger jsInvokeResponsesCount = new AtomicInteger(0); - @Getter - private final AtomicInteger jsInvokeFailuresCount = new AtomicInteger(0); @Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}") public void printStats() { if (statisticsEnabled) { - if (jsInvokeRequestsCount.get() > 0 || jsInvokeResponsesCount.get() > 0 || jsInvokeFailuresCount.get() > 0) { + if (jsInvokeStats.getRequests() > 0 || jsInvokeStats.getResponses() > 0 || jsInvokeStats.getFailures() > 0) { log.info("Rule Engine JS Invoke Stats: requests [{}] responses [{}] failures [{}]", - jsInvokeRequestsCount.getAndSet(0), jsInvokeResponsesCount.getAndSet(0), jsInvokeFailuresCount.getAndSet(0)); + jsInvokeStats.getRequests(), jsInvokeStats.getResponses(), jsInvokeStats.getFailures()); + jsInvokeStats.reset(); } } } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 25f2a7f443..a057c54f0a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -292,21 +292,21 @@ class DefaultTbContext implements TbContext { @Override public void logJsEvalRequest() { if (mainCtx.isStatisticsEnabled()) { - mainCtx.getJsInvokeRequestsCount().incrementAndGet(); + mainCtx.getJsInvokeStats().incrementRequests(); } } @Override public void logJsEvalResponse() { if (mainCtx.isStatisticsEnabled()) { - mainCtx.getJsInvokeResponsesCount().incrementAndGet(); + mainCtx.getJsInvokeStats().incrementResponses(); } } @Override public void logJsEvalFailure() { if (mainCtx.isStatisticsEnabled()) { - mainCtx.getJsInvokeFailuresCount().incrementAndGet(); + mainCtx.getJsInvokeStats().incrementFailures(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java b/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java new file mode 100644 index 0000000000..f7a3117554 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.metrics; + +import io.micrometer.core.instrument.Counter; + +public class StubCounter implements Counter { + @Override + public void increment(double amount) {} + + @Override + public double count() { + return 0; + } + + @Override + public Id getId() { + return null; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index ff88f012ee..cc5b100f7b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -26,16 +26,7 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; -import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.gen.transport.TransportProtos.*; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; @@ -47,6 +38,7 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.state.DeviceStateService; +import org.thingsboard.server.service.stats.StatsCounterFactory; import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; @@ -81,18 +73,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration)); - consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName())); + consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), counterFactory)); } submitExecutor = Executors.newSingleThreadExecutor(); } @@ -269,6 +273,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< consumerStats.forEach((queue, stats) -> { stats.printStats(); statisticsService.reportQueueStats(ts, stats); + stats.reset(); }); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index c362c5e889..728f8b3ea1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -17,76 +17,124 @@ package org.thingsboard.server.service.queue; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.stats.StatsCounter; +import org.thingsboard.server.service.stats.StatsCounterFactory; +import org.thingsboard.server.service.stats.StatsType; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class TbCoreConsumerStats { + public static final String TOTAL_MSGS = "totalMsgs"; + public static final String SESSION_EVENTS = "sessionEvents"; + public static final String GET_ATTRIBUTE = "getAttr"; + public static final String ATTRIBUTE_SUBSCRIBES = "subToAttr"; + public static final String RPC_SUBSCRIBES = "subToRpc"; + public static final String TO_DEVICE_RPC_CALL_RESPONSES = "toDevRpc"; + public static final String SUBSCRIPTION_INFO = "subInfo"; + public static final String DEVICE_CLAIMS = "claimDevice"; + public static final String DEVICE_STATES = "deviceState"; + public static final String SUBSCRIPTION_MSGS = "subMsgs"; + public static final String TO_CORE_NOTIFICATIONS = "coreNfs"; - private final AtomicInteger totalCounter = new AtomicInteger(0); - private final AtomicInteger sessionEventCounter = new AtomicInteger(0); - private final AtomicInteger getAttributesCounter = new AtomicInteger(0); - private final AtomicInteger subscribeToAttributesCounter = new AtomicInteger(0); - private final AtomicInteger subscribeToRPCCounter = new AtomicInteger(0); - private final AtomicInteger toDeviceRPCCallResponseCounter = new AtomicInteger(0); - private final AtomicInteger subscriptionInfoCounter = new AtomicInteger(0); - private final AtomicInteger claimDeviceCounter = new AtomicInteger(0); + private final StatsCounter totalCounter; + private final StatsCounter sessionEventCounter; + private final StatsCounter getAttributesCounter; + private final StatsCounter subscribeToAttributesCounter; + private final StatsCounter subscribeToRPCCounter; + private final StatsCounter toDeviceRPCCallResponseCounter; + private final StatsCounter subscriptionInfoCounter; + private final StatsCounter claimDeviceCounter; - private final AtomicInteger deviceStateCounter = new AtomicInteger(0); - private final AtomicInteger subscriptionMsgCounter = new AtomicInteger(0); - private final AtomicInteger toCoreNotificationsCounter = new AtomicInteger(0); + private final StatsCounter deviceStateCounter; + private final StatsCounter subscriptionMsgCounter; + private final StatsCounter toCoreNotificationsCounter; + + private final List counters = new ArrayList<>(); + + public TbCoreConsumerStats(StatsCounterFactory counterFactory) { + String statsKey = StatsType.CORE.getName(); + + this.totalCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS); + this.sessionEventCounter = counterFactory.createStatsCounter(statsKey, SESSION_EVENTS); + this.getAttributesCounter = counterFactory.createStatsCounter(statsKey, GET_ATTRIBUTE); + this.subscribeToAttributesCounter = counterFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES); + this.subscribeToRPCCounter = counterFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES); + this.toDeviceRPCCallResponseCounter = counterFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES); + this.subscriptionInfoCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO); + this.claimDeviceCounter = counterFactory.createStatsCounter(statsKey, DEVICE_CLAIMS); + this.deviceStateCounter = counterFactory.createStatsCounter(statsKey, DEVICE_STATES); + this.subscriptionMsgCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS); + this.toCoreNotificationsCounter = counterFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS); + + + counters.add(totalCounter); + counters.add(sessionEventCounter); + counters.add(getAttributesCounter); + counters.add(subscribeToAttributesCounter); + counters.add(subscribeToRPCCounter); + counters.add(toDeviceRPCCallResponseCounter); + counters.add(subscriptionInfoCounter); + counters.add(claimDeviceCounter); + + counters.add(deviceStateCounter); + counters.add(subscriptionMsgCounter); + counters.add(toCoreNotificationsCounter); + } public void log(TransportProtos.TransportToDeviceActorMsg msg) { - totalCounter.incrementAndGet(); + totalCounter.increment(); if (msg.hasSessionEvent()) { - sessionEventCounter.incrementAndGet(); + sessionEventCounter.increment(); } if (msg.hasGetAttributes()) { - getAttributesCounter.incrementAndGet(); + getAttributesCounter.increment(); } if (msg.hasSubscribeToAttributes()) { - subscribeToAttributesCounter.incrementAndGet(); + subscribeToAttributesCounter.increment(); } if (msg.hasSubscribeToRPC()) { - subscribeToRPCCounter.incrementAndGet(); + subscribeToRPCCounter.increment(); } if (msg.hasToDeviceRPCCallResponse()) { - toDeviceRPCCallResponseCounter.incrementAndGet(); + toDeviceRPCCallResponseCounter.increment(); } if (msg.hasSubscriptionInfo()) { - subscriptionInfoCounter.incrementAndGet(); + subscriptionInfoCounter.increment(); } if (msg.hasClaimDevice()) { - claimDeviceCounter.incrementAndGet(); + claimDeviceCounter.increment(); } } public void log(TransportProtos.DeviceStateServiceMsgProto msg) { - totalCounter.incrementAndGet(); - deviceStateCounter.incrementAndGet(); + totalCounter.increment(); + deviceStateCounter.increment(); } public void log(TransportProtos.SubscriptionMgrMsgProto msg) { - totalCounter.incrementAndGet(); - subscriptionMsgCounter.incrementAndGet(); + totalCounter.increment(); + subscriptionMsgCounter.increment(); } public void log(TransportProtos.ToCoreNotificationMsg msg) { - totalCounter.incrementAndGet(); - toCoreNotificationsCounter.incrementAndGet(); + totalCounter.increment(); + toCoreNotificationsCounter.increment(); } public void printStats() { - int total = totalCounter.getAndSet(0); + int total = totalCounter.get(); if (total > 0) { - log.info("Total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]" + - " deviceState [{}] subMgr [{}] coreNfs [{}]", - total, sessionEventCounter.getAndSet(0), - getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0), - subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0), - subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0) - , deviceStateCounter.getAndSet(0), subscriptionMsgCounter.getAndSet(0), toCoreNotificationsCounter.getAndSet(0)); + StringBuilder stats = new StringBuilder(); + counters.forEach(counter -> { + stats.append(counter.getName()).append(" = [").append(counter.get()).append("] "); + }); + log.info("Core Stats: {}", stats); } } + public void reset() { + counters.forEach(StatsCounter::clear); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java index 40017d2b40..29e4c8c2a0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java @@ -22,16 +22,16 @@ import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; +import org.thingsboard.server.service.stats.StatsCounter; +import org.thingsboard.server.service.stats.StatsCounterFactory; +import org.thingsboard.server.service.stats.StatsType; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @Slf4j -@Data public class TbRuleEngineConsumerStats { public static final String TOTAL_MSGS = "totalMsgs"; @@ -43,61 +43,72 @@ public class TbRuleEngineConsumerStats { public static final String SUCCESSFUL_ITERATIONS = "successfulIterations"; public static final String FAILED_ITERATIONS = "failedIterations"; - private final AtomicInteger totalMsgCounter = new AtomicInteger(0); - private final AtomicInteger successMsgCounter = new AtomicInteger(0); - private final AtomicInteger tmpTimeoutMsgCounter = new AtomicInteger(0); - private final AtomicInteger tmpFailedMsgCounter = new AtomicInteger(0); + private final StatsCounter totalMsgCounter; + private final StatsCounter successMsgCounter; + private final StatsCounter tmpTimeoutMsgCounter; + private final StatsCounter tmpFailedMsgCounter; - private final AtomicInteger timeoutMsgCounter = new AtomicInteger(0); - private final AtomicInteger failedMsgCounter = new AtomicInteger(0); + private final StatsCounter timeoutMsgCounter; + private final StatsCounter failedMsgCounter; - private final AtomicInteger successIterationsCounter = new AtomicInteger(0); - private final AtomicInteger failedIterationsCounter = new AtomicInteger(0); + private final StatsCounter successIterationsCounter; + private final StatsCounter failedIterationsCounter; - private final Map counters = new HashMap<>(); + private final List counters = new ArrayList<>(); private final ConcurrentMap tenantStats = new ConcurrentHashMap<>(); private final ConcurrentMap tenantExceptions = new ConcurrentHashMap<>(); private final String queueName; - public TbRuleEngineConsumerStats(String queueName) { + public TbRuleEngineConsumerStats(String queueName, StatsCounterFactory counterFactory) { this.queueName = queueName; - counters.put(TOTAL_MSGS, totalMsgCounter); - counters.put(SUCCESSFUL_MSGS, successMsgCounter); - counters.put(TIMEOUT_MSGS, timeoutMsgCounter); - counters.put(FAILED_MSGS, failedMsgCounter); - counters.put(TMP_TIMEOUT, tmpTimeoutMsgCounter); - counters.put(TMP_FAILED, tmpFailedMsgCounter); - counters.put(SUCCESSFUL_ITERATIONS, successIterationsCounter); - counters.put(FAILED_ITERATIONS, failedIterationsCounter); + String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName; + this.totalMsgCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS); + this.successMsgCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS); + this.timeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TIMEOUT_MSGS); + this.failedMsgCounter = counterFactory.createStatsCounter(statsKey, FAILED_MSGS); + this.tmpTimeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_TIMEOUT); + this.tmpFailedMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_FAILED); + this.successIterationsCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS); + this.failedIterationsCounter = counterFactory.createStatsCounter(statsKey, FAILED_ITERATIONS); + + counters.add(totalMsgCounter); + counters.add(successMsgCounter); + counters.add(timeoutMsgCounter); + counters.add(failedMsgCounter); + + counters.add(tmpTimeoutMsgCounter); + counters.add(tmpFailedMsgCounter); + counters.add(successIterationsCounter); + counters.add(failedIterationsCounter); } public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) { int success = msg.getSuccessMap().size(); int pending = msg.getPendingMap().size(); int failed = msg.getFailedMap().size(); - totalMsgCounter.addAndGet(success + pending + failed); - successMsgCounter.addAndGet(success); + totalMsgCounter.add(success + pending + failed); + successMsgCounter.add(success); msg.getSuccessMap().values().forEach(m -> getTenantStats(m).logSuccess()); if (finalIterationForPack) { if (pending > 0 || failed > 0) { - timeoutMsgCounter.addAndGet(pending); - failedMsgCounter.addAndGet(failed); + timeoutMsgCounter.add(pending); + failedMsgCounter.add(failed); if (pending > 0) { msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTimeout()); } if (failed > 0) { msg.getFailedMap().values().forEach(m -> getTenantStats(m).logFailed()); } - failedIterationsCounter.incrementAndGet(); + failedIterationsCounter.increment(); } else { - successIterationsCounter.incrementAndGet(); + successIterationsCounter.increment(); } } else { - failedIterationsCounter.incrementAndGet(); - tmpTimeoutMsgCounter.addAndGet(pending); - tmpFailedMsgCounter.addAndGet(failed); + failedIterationsCounter.increment(); + tmpTimeoutMsgCounter.add(pending); + tmpFailedMsgCounter.add(failed); if (pending > 0) { msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTmpTimeout()); } @@ -113,19 +124,31 @@ public class TbRuleEngineConsumerStats { return tenantStats.computeIfAbsent(new UUID(reMsg.getTenantIdMSB(), reMsg.getTenantIdLSB()), TbTenantRuleEngineStats::new); } + public ConcurrentMap getTenantStats() { + return tenantStats; + } + + public String getQueueName() { + return queueName; + } + + public ConcurrentMap getTenantExceptions() { + return tenantExceptions; + } + public void printStats() { int total = totalMsgCounter.get(); if (total > 0) { StringBuilder stats = new StringBuilder(); - counters.forEach((label, value) -> { - stats.append(label).append(" = [").append(value.get()).append("] "); + counters.forEach(counter -> { + stats.append(counter.getName()).append(" = [").append(counter.get()).append("] "); }); log.info("[{}] Stats: {}", queueName, stats); } } public void reset() { - counters.values().forEach(counter -> counter.set(0)); + counters.forEach(StatsCounter::clear); tenantStats.clear(); tenantExceptions.clear(); } diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java new file mode 100644 index 0000000000..24193d8d09 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.stats; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.thingsboard.server.actors.JsInvokeStats; + +import javax.annotation.PostConstruct; + +@Service +public class DefaultJsInvokeStats implements JsInvokeStats { + private static final String REQUESTS = "requests"; + private static final String RESPONSES = "responses"; + private static final String FAILURES = "failures"; + + private StatsCounter requestsCounter; + private StatsCounter responsesCounter; + private StatsCounter failuresCounter; + + @Autowired + private StatsCounterFactory counterFactory; + + @PostConstruct + public void init() { + String key = StatsType.JS_INVOKE.getName(); + this.requestsCounter = counterFactory.createStatsCounter(key, REQUESTS); + this.responsesCounter = counterFactory.createStatsCounter(key, RESPONSES); + this.failuresCounter = counterFactory.createStatsCounter(key, FAILURES); + } + + @Override + public void incrementRequests(int amount) { + requestsCounter.add(amount); + } + + @Override + public void incrementResponses(int amount) { + responsesCounter.add(amount); + } + + @Override + public void incrementFailures(int amount) { + failuresCounter.add(amount); + } + + @Override + public int getRequests() { + return requestsCounter.get(); + } + + @Override + public int getResponses() { + return responsesCounter.get(); + } + + @Override + public int getFailures() { + return failuresCounter.get(); + } + + @Override + public void reset() { + requestsCounter.clear(); + responsesCounter.clear(); + failuresCounter.clear(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java new file mode 100644 index 0000000000..d6c42a82b5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.stats; + +import org.thingsboard.server.queue.stats.QueueStats; + +public class DefaultQueueStats implements QueueStats { + private final StatsCounter totalCounter; + private final StatsCounter successfulCounter; + private final StatsCounter failedCounter; + + public DefaultQueueStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) { + this.totalCounter = totalCounter; + this.successfulCounter = successfulCounter; + this.failedCounter = failedCounter; + } + + @Override + public void incrementTotal(int amount) { + totalCounter.add(amount); + } + + @Override + public void incrementSuccessful(int amount) { + successfulCounter.add(amount); + } + + @Override + public void incrementFailed(int amount) { + failedCounter.add(amount); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java index a506b87ab0..b46033389b 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java @@ -104,7 +104,6 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS } } }); - ruleEngineStats.reset(); } private AssetId getServiceAssetId(TenantId tenantId, String queueName) { diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java b/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java new file mode 100644 index 0000000000..17f27e9d46 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.stats; + +import io.micrometer.core.instrument.Counter; + +import java.util.concurrent.atomic.AtomicInteger; + +public class StatsCounter { + private final AtomicInteger aiCounter; + private final Counter micrometerCounter; + private final String name; + + public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) { + this.aiCounter = aiCounter; + this.micrometerCounter = micrometerCounter; + this.name = name; + } + + public void increment() { + aiCounter.incrementAndGet(); + micrometerCounter.increment(); + } + + public void clear() { + aiCounter.set(0); + } + + public int get() { + return aiCounter.get(); + } + + public void add(int delta){ + aiCounter.addAndGet(delta); + micrometerCounter.increment(delta); + } + + public String getName() { + return name; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java b/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java new file mode 100644 index 0000000000..f8276bef33 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.stats; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.service.metrics.StubCounter; + +import java.util.concurrent.atomic.AtomicInteger; + +@Service +public class StatsCounterFactory { + private static final String STATS_NAME_TAG = "statsName"; + + private static final Counter STUB_COUNTER = new StubCounter(); + + @Autowired + private MeterRegistry meterRegistry; + + @Value("${metrics.enabled}") + private Boolean metricsEnabled; + + public StatsCounter createStatsCounter(String key, String statsName) { + return new StatsCounter( + new AtomicInteger(0), + metricsEnabled ? + meterRegistry.counter(key, STATS_NAME_TAG, statsName) + : STUB_COUNTER, + statsName + ); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java b/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java new file mode 100644 index 0000000000..a831498993 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.stats; + +public enum StatsType { + RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke"); + + private String name; + + StatsType(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java index 566bf99cd3..5b9fc91bdf 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java @@ -29,6 +29,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.stats.DefaultQueueStats; +import org.thingsboard.server.service.stats.StatsCounter; +import org.thingsboard.server.service.stats.StatsCounterFactory; +import org.thingsboard.server.service.stats.StatsType; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -41,9 +45,13 @@ import java.util.concurrent.*; @Service @TbCoreComponent public class TbCoreTransportApiService { + private static final String TOTAL_MSGS = "totalMsgs"; + private static final String SUCCESSFUL_MSGS = "successfulMsgs"; + private static final String FAILED_MSGS = "failedMsgs"; private final TbCoreQueueFactory tbCoreQueueFactory; private final TransportApiService transportApiService; + private final StatsCounterFactory counterFactory; @Value("${queue.transport_api.max_pending_requests:10000}") private int maxPendingRequests; @@ -58,9 +66,10 @@ public class TbCoreTransportApiService { private TbQueueResponseTemplate, TbProtoQueueMsg> transportApiTemplate; - public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService) { + public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsCounterFactory counterFactory) { this.tbCoreQueueFactory = tbCoreQueueFactory; this.transportApiService = transportApiService; + this.counterFactory = counterFactory; } @PostConstruct @@ -69,6 +78,12 @@ public class TbCoreTransportApiService { TbQueueProducer> producer = tbCoreQueueFactory.createTransportApiResponseProducer(); TbQueueConsumer> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer(); + String key = StatsType.TRANSPORT.getName(); + StatsCounter totalCounter = counterFactory.createStatsCounter(key, TOTAL_MSGS); + StatsCounter successfulCounter = counterFactory.createStatsCounter(key, SUCCESSFUL_MSGS); + StatsCounter failedCounter = counterFactory.createStatsCounter(key, FAILED_MSGS); + DefaultQueueStats queueStats = new DefaultQueueStats(totalCounter, successfulCounter, failedCounter); + DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder , TbProtoQueueMsg> builder = DefaultTbQueueResponseTemplate.builder(); builder.requestTemplate(consumer); @@ -78,6 +93,7 @@ public class TbCoreTransportApiService { builder.pollInterval(responsePollDuration); builder.executor(transportCallbackExecutor); builder.handler(transportApiService); + builder.stats(queueStats); transportApiTemplate = builder.build(); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d25c94e49a..c0aa849ad3 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -754,3 +754,13 @@ service: id: "${TB_SERVICE_ID:}" tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. +metrics: + # Enable/disable actuator metrics. + enabled: "${METRICS_ENABLED:false}" + +management: + endpoints: + web: + exposure: + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). + include: '${METRICS_ENDPOINTS_EXPOSE:info}' \ No newline at end of file diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/JsInvokeStats.java b/common/actor/src/main/java/org/thingsboard/server/actors/JsInvokeStats.java new file mode 100644 index 0000000000..4e539e57fc --- /dev/null +++ b/common/actor/src/main/java/org/thingsboard/server/actors/JsInvokeStats.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +public interface JsInvokeStats { + default void incrementRequests() { + incrementRequests(1); + } + + void incrementRequests(int amount); + + default void incrementResponses() { + incrementResponses(1); + } + + void incrementResponses(int amount); + + default void incrementFailures() { + incrementFailures(1); + } + + void incrementFailures(int amount); + + int getRequests(); + + int getResponses(); + + int getFailures(); + + void reset(); +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java index 96891ee7d8..602a180170 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java @@ -23,6 +23,7 @@ import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; +import org.thingsboard.server.queue.stats.QueueStats; import java.util.List; import java.util.UUID; @@ -44,6 +45,7 @@ public class DefaultTbQueueResponseTemplate(); @@ -66,6 +69,7 @@ public class DefaultTbQueueResponseTemplate { pendingRequestCount.decrementAndGet(); response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId)); responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null); + stats.incrementSuccessful(); }, e -> { pendingRequestCount.decrementAndGet(); @@ -121,6 +127,7 @@ public class DefaultTbQueueResponseTemplate1.4.3 1.9.4 3.2.2 + 1.5.2 @@ -1371,6 +1372,21 @@ struts-tiles ${struts.version} + + org.springframework.boot + spring-boot-starter-actuator + ${spring-boot.version} + + + io.micrometer + micrometer-core + ${micrometer.version} + + + io.micrometer + micrometer-registry-prometheus + ${micrometer.version} +