Prometheus Metrics (#3052)

* Moved resetting ruleEngineStats to Consumer

* Moved counters to Map in TbCoreConsumerStats

* Added actuator and MetricsService

* Added metrics to core and rule_engine consumers

* Replaced summary with counters

* Removed most setters and getters from TbRuleEngine stats

* Added stats to Transport consumer

* Added JsInvoke actuator stats

* Removed MetricsService
This commit is contained in:
vzikratyi-tb 2020-07-06 14:47:36 +03:00 committed by GitHub
parent 7d739dfaae
commit 89419c6999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 597 additions and 95 deletions

View File

@ -298,6 +298,18 @@
<groupId>com.github.ua-parser</groupId> <groupId>com.github.ua-parser</groupId>
<artifactId>uap-java</artifactId> <artifactId>uap-java</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -219,6 +219,10 @@ public class ActorSystemContext {
@Getter @Getter
private ClaimDevicesService claimDevicesService; private ClaimDevicesService claimDevicesService;
@Autowired
@Getter
private JsInvokeStats jsInvokeStats;
//TODO: separate context for TbCore and TbRuleEngine //TODO: separate context for TbCore and TbRuleEngine
@Autowired(required = false) @Autowired(required = false)
@Getter @Getter
@ -272,19 +276,14 @@ public class ActorSystemContext {
@Getter @Getter
private long statisticsPersistFrequency; private long statisticsPersistFrequency;
@Getter
private final AtomicInteger jsInvokeRequestsCount = new AtomicInteger(0);
@Getter
private final AtomicInteger jsInvokeResponsesCount = new AtomicInteger(0);
@Getter
private final AtomicInteger jsInvokeFailuresCount = new AtomicInteger(0);
@Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}") @Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}")
public void printStats() { public void printStats() {
if (statisticsEnabled) { if (statisticsEnabled) {
if (jsInvokeRequestsCount.get() > 0 || jsInvokeResponsesCount.get() > 0 || jsInvokeFailuresCount.get() > 0) { if (jsInvokeStats.getRequests() > 0 || jsInvokeStats.getResponses() > 0 || jsInvokeStats.getFailures() > 0) {
log.info("Rule Engine JS Invoke Stats: requests [{}] responses [{}] failures [{}]", log.info("Rule Engine JS Invoke Stats: requests [{}] responses [{}] failures [{}]",
jsInvokeRequestsCount.getAndSet(0), jsInvokeResponsesCount.getAndSet(0), jsInvokeFailuresCount.getAndSet(0)); jsInvokeStats.getRequests(), jsInvokeStats.getResponses(), jsInvokeStats.getFailures());
jsInvokeStats.reset();
} }
} }
} }

View File

@ -292,21 +292,21 @@ class DefaultTbContext implements TbContext {
@Override @Override
public void logJsEvalRequest() { public void logJsEvalRequest() {
if (mainCtx.isStatisticsEnabled()) { if (mainCtx.isStatisticsEnabled()) {
mainCtx.getJsInvokeRequestsCount().incrementAndGet(); mainCtx.getJsInvokeStats().incrementRequests();
} }
} }
@Override @Override
public void logJsEvalResponse() { public void logJsEvalResponse() {
if (mainCtx.isStatisticsEnabled()) { if (mainCtx.isStatisticsEnabled()) {
mainCtx.getJsInvokeResponsesCount().incrementAndGet(); mainCtx.getJsInvokeStats().incrementResponses();
} }
} }
@Override @Override
public void logJsEvalFailure() { public void logJsEvalFailure() {
if (mainCtx.isStatisticsEnabled()) { if (mainCtx.isStatisticsEnabled()) {
mainCtx.getJsInvokeFailuresCount().incrementAndGet(); mainCtx.getJsInvokeStats().incrementFailures();
} }
} }

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.metrics;
import io.micrometer.core.instrument.Counter;
public class StubCounter implements Counter {
@Override
public void increment(double amount) {}
@Override
public double count() {
return 0;
}
@Override
public Id getId() {
return null;
}
}

View File

@ -26,16 +26,7 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
@ -47,6 +38,7 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.stats.StatsCounterFactory;
import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
@ -81,18 +73,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final TbLocalSubscriptionService localSubscriptionService; private final TbLocalSubscriptionService localSubscriptionService;
private final SubscriptionManagerService subscriptionManagerService; private final SubscriptionManagerService subscriptionManagerService;
private final TbCoreDeviceRpcService tbCoreDeviceRpcService; private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); private final TbCoreConsumerStats stats;
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,
DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService, DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService,
SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService, SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,
TbCoreDeviceRpcService tbCoreDeviceRpcService) { TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsCounterFactory counterFactory) {
super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer()); super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.stateService = stateService; this.stateService = stateService;
this.localSubscriptionService = localSubscriptionService; this.localSubscriptionService = localSubscriptionService;
this.subscriptionManagerService = subscriptionManagerService; this.subscriptionManagerService = subscriptionManagerService;
this.tbCoreDeviceRpcService = tbCoreDeviceRpcService; this.tbCoreDeviceRpcService = tbCoreDeviceRpcService;
this.stats = new TbCoreConsumerStats(counterFactory);
} }
@PostConstruct @PostConstruct
@ -228,6 +221,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
public void printStats() { public void printStats() {
if (statsEnabled) { if (statsEnabled) {
stats.printStats(); stats.printStats();
stats.reset();
} }
} }

View File

@ -52,6 +52,7 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrateg
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
import org.thingsboard.server.service.stats.RuleEngineStatisticsService; import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
import org.thingsboard.server.service.stats.StatsCounterFactory;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -79,6 +80,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Value("${queue.rule-engine.stats.enabled:true}") @Value("${queue.rule-engine.stats.enabled:true}")
private boolean statsEnabled; private boolean statsEnabled;
private final StatsCounterFactory counterFactory;
private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory; private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory;
private final TbRuleEngineProcessingStrategyFactory processingStrategyFactory; private final TbRuleEngineProcessingStrategyFactory processingStrategyFactory;
private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory; private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
@ -95,7 +97,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TbQueueRuleEngineSettings ruleEngineSettings, TbQueueRuleEngineSettings ruleEngineSettings,
TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService, TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
ActorSystemContext actorContext, DataDecodingEncodingService encodingService, ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
TbRuleEngineDeviceRpcService tbDeviceRpcService) { TbRuleEngineDeviceRpcService tbDeviceRpcService,
StatsCounterFactory counterFactory) {
super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
this.statisticsService = statisticsService; this.statisticsService = statisticsService;
this.ruleEngineSettings = ruleEngineSettings; this.ruleEngineSettings = ruleEngineSettings;
@ -103,6 +106,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
this.submitStrategyFactory = submitStrategyFactory; this.submitStrategyFactory = submitStrategyFactory;
this.processingStrategyFactory = processingStrategyFactory; this.processingStrategyFactory = processingStrategyFactory;
this.tbDeviceRpcService = tbDeviceRpcService; this.tbDeviceRpcService = tbDeviceRpcService;
this.counterFactory = counterFactory;
} }
@PostConstruct @PostConstruct
@ -111,7 +115,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
for (TbRuleEngineQueueConfiguration configuration : ruleEngineSettings.getQueues()) { for (TbRuleEngineQueueConfiguration configuration : ruleEngineSettings.getQueues()) {
consumerConfigurations.putIfAbsent(configuration.getName(), configuration); consumerConfigurations.putIfAbsent(configuration.getName(), configuration);
consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration)); consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName())); consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), counterFactory));
} }
submitExecutor = Executors.newSingleThreadExecutor(); submitExecutor = Executors.newSingleThreadExecutor();
} }
@ -269,6 +273,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
consumerStats.forEach((queue, stats) -> { consumerStats.forEach((queue, stats) -> {
stats.printStats(); stats.printStats();
statisticsService.reportQueueStats(ts, stats); statisticsService.reportQueueStats(ts, stats);
stats.reset();
}); });
} }
} }

View File

@ -17,76 +17,124 @@ package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.stats.StatsCounter;
import org.thingsboard.server.service.stats.StatsCounterFactory;
import org.thingsboard.server.service.stats.StatsType;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
public class TbCoreConsumerStats { public class TbCoreConsumerStats {
public static final String TOTAL_MSGS = "totalMsgs";
public static final String SESSION_EVENTS = "sessionEvents";
public static final String GET_ATTRIBUTE = "getAttr";
public static final String ATTRIBUTE_SUBSCRIBES = "subToAttr";
public static final String RPC_SUBSCRIBES = "subToRpc";
public static final String TO_DEVICE_RPC_CALL_RESPONSES = "toDevRpc";
public static final String SUBSCRIPTION_INFO = "subInfo";
public static final String DEVICE_CLAIMS = "claimDevice";
public static final String DEVICE_STATES = "deviceState";
public static final String SUBSCRIPTION_MSGS = "subMsgs";
public static final String TO_CORE_NOTIFICATIONS = "coreNfs";
private final AtomicInteger totalCounter = new AtomicInteger(0); private final StatsCounter totalCounter;
private final AtomicInteger sessionEventCounter = new AtomicInteger(0); private final StatsCounter sessionEventCounter;
private final AtomicInteger getAttributesCounter = new AtomicInteger(0); private final StatsCounter getAttributesCounter;
private final AtomicInteger subscribeToAttributesCounter = new AtomicInteger(0); private final StatsCounter subscribeToAttributesCounter;
private final AtomicInteger subscribeToRPCCounter = new AtomicInteger(0); private final StatsCounter subscribeToRPCCounter;
private final AtomicInteger toDeviceRPCCallResponseCounter = new AtomicInteger(0); private final StatsCounter toDeviceRPCCallResponseCounter;
private final AtomicInteger subscriptionInfoCounter = new AtomicInteger(0); private final StatsCounter subscriptionInfoCounter;
private final AtomicInteger claimDeviceCounter = new AtomicInteger(0); private final StatsCounter claimDeviceCounter;
private final AtomicInteger deviceStateCounter = new AtomicInteger(0); private final StatsCounter deviceStateCounter;
private final AtomicInteger subscriptionMsgCounter = new AtomicInteger(0); private final StatsCounter subscriptionMsgCounter;
private final AtomicInteger toCoreNotificationsCounter = new AtomicInteger(0); private final StatsCounter toCoreNotificationsCounter;
private final List<StatsCounter> counters = new ArrayList<>();
public TbCoreConsumerStats(StatsCounterFactory counterFactory) {
String statsKey = StatsType.CORE.getName();
this.totalCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
this.sessionEventCounter = counterFactory.createStatsCounter(statsKey, SESSION_EVENTS);
this.getAttributesCounter = counterFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
this.subscribeToAttributesCounter = counterFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
this.subscribeToRPCCounter = counterFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
this.toDeviceRPCCallResponseCounter = counterFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
this.subscriptionInfoCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
this.claimDeviceCounter = counterFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
this.deviceStateCounter = counterFactory.createStatsCounter(statsKey, DEVICE_STATES);
this.subscriptionMsgCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
this.toCoreNotificationsCounter = counterFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
counters.add(totalCounter);
counters.add(sessionEventCounter);
counters.add(getAttributesCounter);
counters.add(subscribeToAttributesCounter);
counters.add(subscribeToRPCCounter);
counters.add(toDeviceRPCCallResponseCounter);
counters.add(subscriptionInfoCounter);
counters.add(claimDeviceCounter);
counters.add(deviceStateCounter);
counters.add(subscriptionMsgCounter);
counters.add(toCoreNotificationsCounter);
}
public void log(TransportProtos.TransportToDeviceActorMsg msg) { public void log(TransportProtos.TransportToDeviceActorMsg msg) {
totalCounter.incrementAndGet(); totalCounter.increment();
if (msg.hasSessionEvent()) { if (msg.hasSessionEvent()) {
sessionEventCounter.incrementAndGet(); sessionEventCounter.increment();
} }
if (msg.hasGetAttributes()) { if (msg.hasGetAttributes()) {
getAttributesCounter.incrementAndGet(); getAttributesCounter.increment();
} }
if (msg.hasSubscribeToAttributes()) { if (msg.hasSubscribeToAttributes()) {
subscribeToAttributesCounter.incrementAndGet(); subscribeToAttributesCounter.increment();
} }
if (msg.hasSubscribeToRPC()) { if (msg.hasSubscribeToRPC()) {
subscribeToRPCCounter.incrementAndGet(); subscribeToRPCCounter.increment();
} }
if (msg.hasToDeviceRPCCallResponse()) { if (msg.hasToDeviceRPCCallResponse()) {
toDeviceRPCCallResponseCounter.incrementAndGet(); toDeviceRPCCallResponseCounter.increment();
} }
if (msg.hasSubscriptionInfo()) { if (msg.hasSubscriptionInfo()) {
subscriptionInfoCounter.incrementAndGet(); subscriptionInfoCounter.increment();
} }
if (msg.hasClaimDevice()) { if (msg.hasClaimDevice()) {
claimDeviceCounter.incrementAndGet(); claimDeviceCounter.increment();
} }
} }
public void log(TransportProtos.DeviceStateServiceMsgProto msg) { public void log(TransportProtos.DeviceStateServiceMsgProto msg) {
totalCounter.incrementAndGet(); totalCounter.increment();
deviceStateCounter.incrementAndGet(); deviceStateCounter.increment();
} }
public void log(TransportProtos.SubscriptionMgrMsgProto msg) { public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.incrementAndGet(); totalCounter.increment();
subscriptionMsgCounter.incrementAndGet(); subscriptionMsgCounter.increment();
} }
public void log(TransportProtos.ToCoreNotificationMsg msg) { public void log(TransportProtos.ToCoreNotificationMsg msg) {
totalCounter.incrementAndGet(); totalCounter.increment();
toCoreNotificationsCounter.incrementAndGet(); toCoreNotificationsCounter.increment();
} }
public void printStats() { public void printStats() {
int total = totalCounter.getAndSet(0); int total = totalCounter.get();
if (total > 0) { if (total > 0) {
log.info("Total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]" + StringBuilder stats = new StringBuilder();
" deviceState [{}] subMgr [{}] coreNfs [{}]", counters.forEach(counter -> {
total, sessionEventCounter.getAndSet(0), stats.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0), });
subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0), log.info("Core Stats: {}", stats);
subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0)
, deviceStateCounter.getAndSet(0), subscriptionMsgCounter.getAndSet(0), toCoreNotificationsCounter.getAndSet(0));
} }
} }
public void reset() {
counters.forEach(StatsCounter::clear);
}
} }

View File

@ -22,16 +22,16 @@ import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import org.thingsboard.server.service.stats.StatsCounter;
import org.thingsboard.server.service.stats.StatsCounterFactory;
import org.thingsboard.server.service.stats.StatsType;
import java.util.HashMap; import java.util.*;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
@Data
public class TbRuleEngineConsumerStats { public class TbRuleEngineConsumerStats {
public static final String TOTAL_MSGS = "totalMsgs"; public static final String TOTAL_MSGS = "totalMsgs";
@ -43,61 +43,72 @@ public class TbRuleEngineConsumerStats {
public static final String SUCCESSFUL_ITERATIONS = "successfulIterations"; public static final String SUCCESSFUL_ITERATIONS = "successfulIterations";
public static final String FAILED_ITERATIONS = "failedIterations"; public static final String FAILED_ITERATIONS = "failedIterations";
private final AtomicInteger totalMsgCounter = new AtomicInteger(0); private final StatsCounter totalMsgCounter;
private final AtomicInteger successMsgCounter = new AtomicInteger(0); private final StatsCounter successMsgCounter;
private final AtomicInteger tmpTimeoutMsgCounter = new AtomicInteger(0); private final StatsCounter tmpTimeoutMsgCounter;
private final AtomicInteger tmpFailedMsgCounter = new AtomicInteger(0); private final StatsCounter tmpFailedMsgCounter;
private final AtomicInteger timeoutMsgCounter = new AtomicInteger(0); private final StatsCounter timeoutMsgCounter;
private final AtomicInteger failedMsgCounter = new AtomicInteger(0); private final StatsCounter failedMsgCounter;
private final AtomicInteger successIterationsCounter = new AtomicInteger(0); private final StatsCounter successIterationsCounter;
private final AtomicInteger failedIterationsCounter = new AtomicInteger(0); private final StatsCounter failedIterationsCounter;
private final Map<String, AtomicInteger> counters = new HashMap<>(); private final List<StatsCounter> counters = new ArrayList<>();
private final ConcurrentMap<UUID, TbTenantRuleEngineStats> tenantStats = new ConcurrentHashMap<>(); private final ConcurrentMap<UUID, TbTenantRuleEngineStats> tenantStats = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, RuleEngineException> tenantExceptions = new ConcurrentHashMap<>(); private final ConcurrentMap<TenantId, RuleEngineException> tenantExceptions = new ConcurrentHashMap<>();
private final String queueName; private final String queueName;
public TbRuleEngineConsumerStats(String queueName) { public TbRuleEngineConsumerStats(String queueName, StatsCounterFactory counterFactory) {
this.queueName = queueName; this.queueName = queueName;
counters.put(TOTAL_MSGS, totalMsgCounter);
counters.put(SUCCESSFUL_MSGS, successMsgCounter);
counters.put(TIMEOUT_MSGS, timeoutMsgCounter);
counters.put(FAILED_MSGS, failedMsgCounter);
counters.put(TMP_TIMEOUT, tmpTimeoutMsgCounter); String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
counters.put(TMP_FAILED, tmpFailedMsgCounter); this.totalMsgCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
counters.put(SUCCESSFUL_ITERATIONS, successIterationsCounter); this.successMsgCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
counters.put(FAILED_ITERATIONS, failedIterationsCounter); this.timeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
this.failedMsgCounter = counterFactory.createStatsCounter(statsKey, FAILED_MSGS);
this.tmpTimeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
this.tmpFailedMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_FAILED);
this.successIterationsCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
this.failedIterationsCounter = counterFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
counters.add(totalMsgCounter);
counters.add(successMsgCounter);
counters.add(timeoutMsgCounter);
counters.add(failedMsgCounter);
counters.add(tmpTimeoutMsgCounter);
counters.add(tmpFailedMsgCounter);
counters.add(successIterationsCounter);
counters.add(failedIterationsCounter);
} }
public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) { public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) {
int success = msg.getSuccessMap().size(); int success = msg.getSuccessMap().size();
int pending = msg.getPendingMap().size(); int pending = msg.getPendingMap().size();
int failed = msg.getFailedMap().size(); int failed = msg.getFailedMap().size();
totalMsgCounter.addAndGet(success + pending + failed); totalMsgCounter.add(success + pending + failed);
successMsgCounter.addAndGet(success); successMsgCounter.add(success);
msg.getSuccessMap().values().forEach(m -> getTenantStats(m).logSuccess()); msg.getSuccessMap().values().forEach(m -> getTenantStats(m).logSuccess());
if (finalIterationForPack) { if (finalIterationForPack) {
if (pending > 0 || failed > 0) { if (pending > 0 || failed > 0) {
timeoutMsgCounter.addAndGet(pending); timeoutMsgCounter.add(pending);
failedMsgCounter.addAndGet(failed); failedMsgCounter.add(failed);
if (pending > 0) { if (pending > 0) {
msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTimeout()); msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTimeout());
} }
if (failed > 0) { if (failed > 0) {
msg.getFailedMap().values().forEach(m -> getTenantStats(m).logFailed()); msg.getFailedMap().values().forEach(m -> getTenantStats(m).logFailed());
} }
failedIterationsCounter.incrementAndGet(); failedIterationsCounter.increment();
} else { } else {
successIterationsCounter.incrementAndGet(); successIterationsCounter.increment();
} }
} else { } else {
failedIterationsCounter.incrementAndGet(); failedIterationsCounter.increment();
tmpTimeoutMsgCounter.addAndGet(pending); tmpTimeoutMsgCounter.add(pending);
tmpFailedMsgCounter.addAndGet(failed); tmpFailedMsgCounter.add(failed);
if (pending > 0) { if (pending > 0) {
msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTmpTimeout()); msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTmpTimeout());
} }
@ -113,19 +124,31 @@ public class TbRuleEngineConsumerStats {
return tenantStats.computeIfAbsent(new UUID(reMsg.getTenantIdMSB(), reMsg.getTenantIdLSB()), TbTenantRuleEngineStats::new); return tenantStats.computeIfAbsent(new UUID(reMsg.getTenantIdMSB(), reMsg.getTenantIdLSB()), TbTenantRuleEngineStats::new);
} }
public ConcurrentMap<UUID, TbTenantRuleEngineStats> getTenantStats() {
return tenantStats;
}
public String getQueueName() {
return queueName;
}
public ConcurrentMap<TenantId, RuleEngineException> getTenantExceptions() {
return tenantExceptions;
}
public void printStats() { public void printStats() {
int total = totalMsgCounter.get(); int total = totalMsgCounter.get();
if (total > 0) { if (total > 0) {
StringBuilder stats = new StringBuilder(); StringBuilder stats = new StringBuilder();
counters.forEach((label, value) -> { counters.forEach(counter -> {
stats.append(label).append(" = [").append(value.get()).append("] "); stats.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
}); });
log.info("[{}] Stats: {}", queueName, stats); log.info("[{}] Stats: {}", queueName, stats);
} }
} }
public void reset() { public void reset() {
counters.values().forEach(counter -> counter.set(0)); counters.forEach(StatsCounter::clear);
tenantStats.clear(); tenantStats.clear();
tenantExceptions.clear(); tenantExceptions.clear();
} }

View File

@ -0,0 +1,81 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.stats;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.JsInvokeStats;
import javax.annotation.PostConstruct;
@Service
public class DefaultJsInvokeStats implements JsInvokeStats {
private static final String REQUESTS = "requests";
private static final String RESPONSES = "responses";
private static final String FAILURES = "failures";
private StatsCounter requestsCounter;
private StatsCounter responsesCounter;
private StatsCounter failuresCounter;
@Autowired
private StatsCounterFactory counterFactory;
@PostConstruct
public void init() {
String key = StatsType.JS_INVOKE.getName();
this.requestsCounter = counterFactory.createStatsCounter(key, REQUESTS);
this.responsesCounter = counterFactory.createStatsCounter(key, RESPONSES);
this.failuresCounter = counterFactory.createStatsCounter(key, FAILURES);
}
@Override
public void incrementRequests(int amount) {
requestsCounter.add(amount);
}
@Override
public void incrementResponses(int amount) {
responsesCounter.add(amount);
}
@Override
public void incrementFailures(int amount) {
failuresCounter.add(amount);
}
@Override
public int getRequests() {
return requestsCounter.get();
}
@Override
public int getResponses() {
return responsesCounter.get();
}
@Override
public int getFailures() {
return failuresCounter.get();
}
@Override
public void reset() {
requestsCounter.clear();
responsesCounter.clear();
failuresCounter.clear();
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.stats;
import org.thingsboard.server.queue.stats.QueueStats;
public class DefaultQueueStats implements QueueStats {
private final StatsCounter totalCounter;
private final StatsCounter successfulCounter;
private final StatsCounter failedCounter;
public DefaultQueueStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
this.totalCounter = totalCounter;
this.successfulCounter = successfulCounter;
this.failedCounter = failedCounter;
}
@Override
public void incrementTotal(int amount) {
totalCounter.add(amount);
}
@Override
public void incrementSuccessful(int amount) {
successfulCounter.add(amount);
}
@Override
public void incrementFailed(int amount) {
failedCounter.add(amount);
}
}

View File

@ -104,7 +104,6 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
} }
} }
}); });
ruleEngineStats.reset();
} }
private AssetId getServiceAssetId(TenantId tenantId, String queueName) { private AssetId getServiceAssetId(TenantId tenantId, String queueName) {

View File

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.stats;
import io.micrometer.core.instrument.Counter;
import java.util.concurrent.atomic.AtomicInteger;
public class StatsCounter {
private final AtomicInteger aiCounter;
private final Counter micrometerCounter;
private final String name;
public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) {
this.aiCounter = aiCounter;
this.micrometerCounter = micrometerCounter;
this.name = name;
}
public void increment() {
aiCounter.incrementAndGet();
micrometerCounter.increment();
}
public void clear() {
aiCounter.set(0);
}
public int get() {
return aiCounter.get();
}
public void add(int delta){
aiCounter.addAndGet(delta);
micrometerCounter.increment(delta);
}
public String getName() {
return name;
}
}

View File

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.stats;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.service.metrics.StubCounter;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class StatsCounterFactory {
private static final String STATS_NAME_TAG = "statsName";
private static final Counter STUB_COUNTER = new StubCounter();
@Autowired
private MeterRegistry meterRegistry;
@Value("${metrics.enabled}")
private Boolean metricsEnabled;
public StatsCounter createStatsCounter(String key, String statsName) {
return new StatsCounter(
new AtomicInteger(0),
metricsEnabled ?
meterRegistry.counter(key, STATS_NAME_TAG, statsName)
: STUB_COUNTER,
statsName
);
}
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.stats;
public enum StatsType {
RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke");
private String name;
StatsType(String name) {
this.name = name;
}
public String getName() {
return name;
}
}

View File

@ -29,6 +29,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.stats.DefaultQueueStats;
import org.thingsboard.server.service.stats.StatsCounter;
import org.thingsboard.server.service.stats.StatsCounterFactory;
import org.thingsboard.server.service.stats.StatsType;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -41,9 +45,13 @@ import java.util.concurrent.*;
@Service @Service
@TbCoreComponent @TbCoreComponent
public class TbCoreTransportApiService { public class TbCoreTransportApiService {
private static final String TOTAL_MSGS = "totalMsgs";
private static final String SUCCESSFUL_MSGS = "successfulMsgs";
private static final String FAILED_MSGS = "failedMsgs";
private final TbCoreQueueFactory tbCoreQueueFactory; private final TbCoreQueueFactory tbCoreQueueFactory;
private final TransportApiService transportApiService; private final TransportApiService transportApiService;
private final StatsCounterFactory counterFactory;
@Value("${queue.transport_api.max_pending_requests:10000}") @Value("${queue.transport_api.max_pending_requests:10000}")
private int maxPendingRequests; private int maxPendingRequests;
@ -58,9 +66,10 @@ public class TbCoreTransportApiService {
private TbQueueResponseTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, private TbQueueResponseTemplate<TbProtoQueueMsg<TransportApiRequestMsg>,
TbProtoQueueMsg<TransportApiResponseMsg>> transportApiTemplate; TbProtoQueueMsg<TransportApiResponseMsg>> transportApiTemplate;
public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService) { public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsCounterFactory counterFactory) {
this.tbCoreQueueFactory = tbCoreQueueFactory; this.tbCoreQueueFactory = tbCoreQueueFactory;
this.transportApiService = transportApiService; this.transportApiService = transportApiService;
this.counterFactory = counterFactory;
} }
@PostConstruct @PostConstruct
@ -69,6 +78,12 @@ public class TbCoreTransportApiService {
TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = tbCoreQueueFactory.createTransportApiResponseProducer(); TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = tbCoreQueueFactory.createTransportApiResponseProducer();
TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer(); TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();
String key = StatsType.TRANSPORT.getName();
StatsCounter totalCounter = counterFactory.createStatsCounter(key, TOTAL_MSGS);
StatsCounter successfulCounter = counterFactory.createStatsCounter(key, SUCCESSFUL_MSGS);
StatsCounter failedCounter = counterFactory.createStatsCounter(key, FAILED_MSGS);
DefaultQueueStats queueStats = new DefaultQueueStats(totalCounter, successfulCounter, failedCounter);
DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder(); <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder();
builder.requestTemplate(consumer); builder.requestTemplate(consumer);
@ -78,6 +93,7 @@ public class TbCoreTransportApiService {
builder.pollInterval(responsePollDuration); builder.pollInterval(responsePollDuration);
builder.executor(transportCallbackExecutor); builder.executor(transportCallbackExecutor);
builder.handler(transportApiService); builder.handler(transportApiService);
builder.stats(queueStats);
transportApiTemplate = builder.build(); transportApiTemplate = builder.build();
} }

View File

@ -754,3 +754,13 @@ service:
id: "${TB_SERVICE_ID:}" id: "${TB_SERVICE_ID:}"
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
metrics:
# Enable/disable actuator metrics.
enabled: "${METRICS_ENABLED:false}"
management:
endpoints:
web:
exposure:
# Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
include: '${METRICS_ENDPOINTS_EXPOSE:info}'

View File

@ -0,0 +1,44 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
public interface JsInvokeStats {
default void incrementRequests() {
incrementRequests(1);
}
void incrementRequests(int amount);
default void incrementResponses() {
incrementResponses(1);
}
void incrementResponses(int amount);
default void incrementFailures() {
incrementFailures(1);
}
void incrementFailures(int amount);
int getRequests();
int getResponses();
int getFailures();
void reset();
}

View File

@ -23,6 +23,7 @@ import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.stats.QueueStats;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -44,6 +45,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
private final ExecutorService loopExecutor; private final ExecutorService loopExecutor;
private final ScheduledExecutorService timeoutExecutor; private final ScheduledExecutorService timeoutExecutor;
private final ExecutorService callbackExecutor; private final ExecutorService callbackExecutor;
private final QueueStats stats;
private final int maxPendingRequests; private final int maxPendingRequests;
private final long requestTimeout; private final long requestTimeout;
@ -58,7 +60,8 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
long pollInterval, long pollInterval,
long requestTimeout, long requestTimeout,
int maxPendingRequests, int maxPendingRequests,
ExecutorService executor) { ExecutorService executor,
QueueStats stats) {
this.requestTemplate = requestTemplate; this.requestTemplate = requestTemplate;
this.responseTemplate = responseTemplate; this.responseTemplate = responseTemplate;
this.pendingRequests = new ConcurrentHashMap<>(); this.pendingRequests = new ConcurrentHashMap<>();
@ -66,6 +69,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
this.pollInterval = pollInterval; this.pollInterval = pollInterval;
this.requestTimeout = requestTimeout; this.requestTimeout = requestTimeout;
this.callbackExecutor = executor; this.callbackExecutor = executor;
this.stats = stats;
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
this.loopExecutor = Executors.newSingleThreadExecutor(); this.loopExecutor = Executors.newSingleThreadExecutor();
} }
@ -108,11 +112,13 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
String responseTopic = bytesToString(responseTopicHeader); String responseTopic = bytesToString(responseTopicHeader);
try { try {
pendingRequestCount.getAndIncrement(); pendingRequestCount.getAndIncrement();
stats.incrementTotal();
AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request), AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
response -> { response -> {
pendingRequestCount.decrementAndGet(); pendingRequestCount.decrementAndGet();
response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId)); response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null); responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
stats.incrementSuccessful();
}, },
e -> { e -> {
pendingRequestCount.decrementAndGet(); pendingRequestCount.decrementAndGet();
@ -121,6 +127,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
} else { } else {
log.trace("[{}] Failed to process the request: {}", requestId, request, e); log.trace("[{}] Failed to process the request: {}", requestId, request, e);
} }
stats.incrementFailed();
}, },
requestTimeout, requestTimeout,
timeoutExecutor, timeoutExecutor,
@ -128,6 +135,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
} catch (Throwable e) { } catch (Throwable e) {
pendingRequestCount.decrementAndGet(); pendingRequestCount.decrementAndGet();
log.warn("[{}] Failed to process the request: {}", requestId, request, e); log.warn("[{}] Failed to process the request: {}", requestId, request, e);
stats.incrementFailed();
} }
} }
}); });

View File

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.stats;
public interface QueueStats {
default void incrementTotal() {
incrementTotal(1);
}
void incrementTotal(int amount);
default void incrementSuccessful() {
incrementSuccessful(1);
}
void incrementSuccessful(int amount);
default void incrementFailed() {
incrementFailed(1);
}
void incrementFailed(int amount);
}

16
pom.xml
View File

@ -105,6 +105,7 @@
<ua-parser.version>1.4.3</ua-parser.version> <ua-parser.version>1.4.3</ua-parser.version>
<commons-beanutils.version>1.9.4</commons-beanutils.version> <commons-beanutils.version>1.9.4</commons-beanutils.version>
<commons-collections.version>3.2.2</commons-collections.version> <commons-collections.version>3.2.2</commons-collections.version>
<micrometer.version>1.5.2</micrometer.version>
</properties> </properties>
<modules> <modules>
@ -1371,6 +1372,21 @@
<artifactId>struts-tiles</artifactId> <artifactId>struts-tiles</artifactId>
<version>${struts.version}</version> <version>${struts.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${micrometer.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>