diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4e9693448a..933c0d38f4 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -622,6 +622,10 @@ queue: transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" + consumer-stats: + enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" + print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}" + kafka-response-timeout-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_RESPONSE_TIMEOUT_MS:1000}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java new file mode 100644 index 0000000000..bfe7d29bb6 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@Component +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@Getter +@AllArgsConstructor +@NoArgsConstructor +public class TbKafkaConsumerStatisticConfig { + @Value("${queue.kafka.consumer-stats.enabled:true}") + private Boolean enabled; + @Value("${queue.kafka.consumer-stats.print-interval-ms:60000}") + private Long printIntervalMs; + @Value("${queue.kafka.consumer-stats.kafka-response-timeout-ms:1000}") + private Long kafkaResponseTimeoutMs; +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java new file mode 100644 index 0000000000..0ccf9ba42c --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java @@ -0,0 +1,184 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.queue.discovery.PartitionService; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +public class TbKafkaConsumerStatsService { + private final Set monitoredGroups = ConcurrentHashMap.newKeySet(); + + private final TbKafkaSettings kafkaSettings; + private final TbKafkaConsumerStatisticConfig statsConfig; + private final PartitionService partitionService; + + private AdminClient adminClient; + private Consumer consumer; + private ScheduledExecutorService statsPrintScheduler; + + @PostConstruct + public void init() { + if (!statsConfig.getEnabled()) { + return; + } + this.adminClient = AdminClient.create(kafkaSettings.toAdminProps()); + this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats")); + + Properties consumerProps = kafkaSettings.toConsumerProps(); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-stats-loader-client-group"); + this.consumer = new KafkaConsumer<>(consumerProps); + + startLogScheduling(); + } + + private void startLogScheduling() { + Duration timeoutDuration = Duration.ofMillis(statsConfig.getKafkaResponseTimeoutMs()); + statsPrintScheduler.scheduleWithFixedDelay(() -> { + if (!isStatsPrintRequired()) { + return; + } + for (String groupId : monitoredGroups) { + try { + Map groupOffsets = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata() + .get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS); + Map endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration); + + List lagTopicsStats = getTopicsStatsWithLag(groupOffsets, endOffsets); + if (!lagTopicsStats.isEmpty()) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < lagTopicsStats.size(); i++) { + builder.append(lagTopicsStats.get(i).toString()); + if (i != lagTopicsStats.size() - 1) { + builder.append(", "); + } + } + log.info("[{}] Topic partitions with lag: [{}].", groupId, builder.toString()); + } + } catch (Exception e) { + log.warn("[{}] Failed to get consumer group stats. Reason - {}.", groupId, e.getMessage()); + log.trace("Detailed error: ", e); + } + } + + }, statsConfig.getPrintIntervalMs(), statsConfig.getPrintIntervalMs(), TimeUnit.MILLISECONDS); + } + + private boolean isStatsPrintRequired() { + boolean isMyRuleEnginePartition = partitionService.resolve(ServiceType.TB_RULE_ENGINE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition(); + boolean isMyCorePartition = partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition(); + return log.isInfoEnabled() && (isMyRuleEnginePartition || isMyCorePartition); + } + + private List getTopicsStatsWithLag(Map groupOffsets, Map endOffsets) { + List consumerGroupStats = new ArrayList<>(); + for (TopicPartition topicPartition : groupOffsets.keySet()) { + long endOffset = endOffsets.get(topicPartition); + long committedOffset = groupOffsets.get(topicPartition).offset(); + long lag = endOffset - committedOffset; + if (lag != 0) { + GroupTopicStats groupTopicStats = GroupTopicStats.builder() + .topic(topicPartition.topic()) + .partition(topicPartition.partition()) + .committedOffset(committedOffset) + .endOffset(endOffset) + .lag(lag) + .build(); + consumerGroupStats.add(groupTopicStats); + } + } + return consumerGroupStats; + } + + public void registerClientGroup(String groupId) { + if (statsConfig.getEnabled() && !StringUtils.isEmpty(groupId)) { + monitoredGroups.add(groupId); + } + } + + public void unregisterClientGroup(String groupId) { + if (statsConfig.getEnabled() && !StringUtils.isEmpty(groupId)) { + monitoredGroups.remove(groupId); + } + } + + @PreDestroy + public void destroy() { + if (statsPrintScheduler != null) { + statsPrintScheduler.shutdownNow(); + } + if (adminClient != null) { + adminClient.close(); + } + if (consumer != null) { + consumer.close(); + } + } + + + @Builder + @Data + private static class GroupTopicStats { + private String topic; + private int partition; + private long committedOffset; + private long endOffset; + private long lag; + + @Override + public String toString() { + return "[" + + "topic=[" + topic + ']' + + ", partition=[" + partition + "]" + + ", committedOffset=[" + committedOffset + "]" + + ", endOffset=[" + endOffset + "]" + + ", lag=[" + lag + "]" + + "]"; + } + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 8743f4d6cc..b1b3d5ce05 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -42,10 +42,13 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; + private final TbKafkaConsumerStatsService statsService; + private final String groupId; + @Builder private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, String clientId, String groupId, String topic, - TbQueueAdmin admin) { + TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) { super(topic); Properties props = settings.toConsumerProps(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); @@ -53,6 +56,13 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } + this.statsService = statsService; + this.groupId = groupId; + + if (statsService != null) { + statsService.registerClientGroup(groupId); + } + this.admin = admin; this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; @@ -96,6 +106,8 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue consumer.unsubscribe(); consumer.close(); } + if (statsService != null) { + statsService.unregisterClientGroup(groupId); + } } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index eebffc37b0..e07f3e43d3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -65,6 +66,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; + private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -79,6 +81,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbQueueRemoteJsInvokeSettings jsInvokeSettings, + TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs) { this.partitionService = partitionService; this.kafkaSettings = kafkaSettings; @@ -88,6 +91,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.jsInvokeSettings = jsInvokeSettings; + this.consumerStatsService = consumerStatsService; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -156,6 +160,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -168,6 +173,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -180,6 +186,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.groupId("monolith-core-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -192,6 +199,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -204,6 +212,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.groupId("monolith-transport-api-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(transportApiAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -237,6 +246,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); } ); + responseBuilder.statsService(consumerStatsService); responseBuilder.admin(jsExecutorAdmin); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder @@ -259,6 +269,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.groupId("monolith-us-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 15771a3b78..24e80adec5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -62,6 +63,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; + private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -75,6 +77,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, TbQueueTransportApiSettings transportApiSettings, TbQueueRemoteJsInvokeSettings jsInvokeSettings, + TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs) { this.partitionService = partitionService; this.kafkaSettings = kafkaSettings; @@ -83,6 +86,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.transportApiSettings = transportApiSettings; this.jsInvokeSettings = jsInvokeSettings; + this.consumerStatsService = consumerStatsService; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -150,6 +154,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.groupId("tb-core-node"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -162,6 +167,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -174,6 +180,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.groupId("tb-core-transport-api-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(transportApiAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -208,6 +215,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } ); responseBuilder.admin(jsExecutorAdmin); + responseBuilder.statsService(consumerStatsService); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); @@ -229,6 +237,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.groupId("tb-core-us-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index b45ec62b04..a8247dc11e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -59,6 +60,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueCoreSettings coreSettings; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; + private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -70,6 +72,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, TbQueueRemoteJsInvokeSettings jsInvokeSettings, + TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs) { this.partitionService = partitionService; this.kafkaSettings = kafkaSettings; @@ -77,6 +80,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; this.jsInvokeSettings = jsInvokeSettings; + this.consumerStatsService = consumerStatsService; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -145,6 +149,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -157,6 +162,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); + consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -181,6 +187,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } ); responseBuilder.admin(jsExecutorAdmin); + responseBuilder.statsService(consumerStatsService); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java index 0c64a57304..8c0c6999d5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java @@ -32,6 +32,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -54,6 +55,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; + private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -66,6 +68,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, + TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs) { this.kafkaSettings = kafkaSettings; this.serviceInfoProvider = serviceInfoProvider; @@ -73,6 +76,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; + this.consumerStatsService = consumerStatsService; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -95,6 +99,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); responseBuilder.admin(transportApiAdmin); + responseBuilder.statsService(consumerStatsService); DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder , TbProtoQueueMsg> templateBuilder = DefaultTbQueueRequestTemplate.builder(); @@ -136,6 +141,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); responseBuilder.admin(notificationAdmin); + responseBuilder.statsService(consumerStatsService); return responseBuilder.build(); }