diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index eaef1f7c7d..7340696788 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -59,8 +59,7 @@ import org.thingsboard.server.gen.edge.v1.RequestMsg; import org.thingsboard.server.gen.edge.v1.ResponseMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -153,10 +152,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private TbCoreQueueFactory tbCoreQueueFactory; @Autowired - private Optional kafkaSettings; - - @Autowired - private Optional kafkaTopicConfigs; + private Optional kafkaAdmin; private Server server; @@ -232,8 +228,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } private EdgeGrpcSession createEdgeGrpcSession(StreamObserver outputStream) { - return kafkaSettings.isPresent() && kafkaTopicConfigs.isPresent() - ? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaSettings.get(), kafkaTopicConfigs.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect, + return kafkaAdmin.isPresent() + ? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaAdmin.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession) : new PostgresEdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession); @@ -643,10 +639,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i List toRemove = new ArrayList<>(); for (EdgeGrpcSession session : sessions.values()) { if (session instanceof KafkaEdgeGrpcSession kafkaSession && - !kafkaSession.isConnected() && - kafkaSession.getConsumer() != null && - kafkaSession.getConsumer().getConsumer() != null && - !kafkaSession.getConsumer().getConsumer().isStopped()) { + !kafkaSession.isConnected() && + kafkaSession.getConsumer() != null && + kafkaSession.getConsumer().getConsumer() != null && + !kafkaSession.getConsumer().getConsumer().isStopped()) { toRemove.add(kafkaSession.getEdge().getId()); } } @@ -663,4 +659,5 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.warn("Failed to cleanup kafka sessions", e); } } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index ab0b42abb4..d165be33d4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -32,9 +32,7 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.service.edge.EdgeContextComponent; @@ -51,9 +49,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { private final TopicService topicService; private final TbCoreQueueFactory tbCoreQueueFactory; - - private final TbKafkaSettings kafkaSettings; - private final TbKafkaTopicConfigs kafkaTopicConfigs; + private final KafkaAdmin kafkaAdmin; private volatile boolean isHighPriorityProcessing; @@ -63,21 +59,20 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { private ExecutorService consumerExecutor; public KafkaEdgeGrpcSession(EdgeContextComponent ctx, TopicService topicService, TbCoreQueueFactory tbCoreQueueFactory, - TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs, StreamObserver outputStream, + KafkaAdmin kafkaAdmin, StreamObserver outputStream, BiConsumer sessionOpenListener, BiConsumer sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize, int maxHighPriorityQueueSizePerSession) { super(ctx, outputStream, sessionOpenListener, sessionCloseListener, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession); this.topicService = topicService; this.tbCoreQueueFactory = tbCoreQueueFactory; - this.kafkaSettings = kafkaSettings; - this.kafkaTopicConfigs = kafkaTopicConfigs; + this.kafkaAdmin = kafkaAdmin; } private void processMsgs(List> msgs, TbQueueConsumer> consumer) { log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId()); if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) { log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " + - "connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration", + "connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration", tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing); return; } @@ -159,7 +154,6 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { @Override public void cleanUp() { String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic(); - TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); kafkaAdmin.deleteTopic(topic); kafkaAdmin.deleteConsumerGroup(topic); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java b/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java index 3fc391ec72..48b2a47cfb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java @@ -35,7 +35,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; import org.thingsboard.server.dao.edge.stats.MsgCounters; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.Collections; @@ -63,7 +63,7 @@ public class EdgeStatsService { private final TimeseriesService tsService; private final EdgeStatsCounterService statsCounterService; private final TopicService topicService; - private final Optional tbKafkaAdmin; + private final Optional kafkaAdmin; @Value("${edges.stats.ttl:30}") private int edgesStatsTtlDays; @@ -81,12 +81,12 @@ public class EdgeStatsService { long ts = now - (now % reportIntervalMillis); Map countersByEdge = statsCounterService.getCounterByEdge(); - Map lagByEdgeId = tbKafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap(); + Map lagByEdgeId = kafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap(); Map countersByEdgeSnapshot = new HashMap<>(statsCounterService.getCounterByEdge()); countersByEdgeSnapshot.forEach((edgeId, counters) -> { TenantId tenantId = counters.getTenantId(); - if (tbKafkaAdmin.isPresent()) { + if (kafkaAdmin.isPresent()) { counters.getMsgsLag().set(lagByEdgeId.getOrDefault(edgeId, 0L)); } List statsEntries = List.of( @@ -109,7 +109,7 @@ public class EdgeStatsService { e -> topicService.buildEdgeEventNotificationsTopicPartitionInfo(e.getValue().getTenantId(), e.getKey()).getTopic() )); - Map lagByTopic = tbKafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values())); + Map lagByTopic = kafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values())); return edgeToTopicMap.entrySet().stream() .collect(Collectors.toMap( diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 43b0c575a0..5ded4f66c6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -20,10 +20,8 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.kafka.KafkaAdmin; -import java.util.Collections; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -33,8 +31,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService { private final boolean syncNeeded; - public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, TopicService topicService, EdqsConfig edqsConfig) { - TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); + public KafkaEdqsSyncService(KafkaAdmin kafkaAdmin, TopicService topicService, EdqsConfig edqsConfig) { this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() .topic(topicService.buildTopicName(edqsConfig.getEventsTopic())) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index 0d4cc26ce7..26f0d81182 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -176,8 +176,8 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb for (int i = oldPartitions; i < newPartitions; i++) { tbQueueAdmin.createTopicIfNotExists( new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), - queue.getCustomProperties() - ); + queue.getCustomProperties(), + true); // forcing topic creation because the topic may still be cached on some nodes } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index 6d06c85585..3354c63f9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -30,9 +30,7 @@ import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.util.TbCoreComponent; import java.time.Instant; @@ -57,7 +55,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { private final TenantService tenantService; private final EdgeService edgeService; private final AttributesService attributesService; - private final TbKafkaAdmin kafkaAdmin; + private final KafkaAdmin kafkaAdmin; @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") private long ttlSeconds; @@ -67,13 +65,13 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, TenantService tenantService, AttributesService attributesService, - TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) { + TopicService topicService, KafkaAdmin kafkaAdmin) { super(partitionService); this.topicService = topicService; this.tenantService = tenantService; this.edgeService = edgeService; this.attributesService = attributesService; - this.kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); + this.kafkaAdmin = kafkaAdmin; } @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.execution_interval_ms})}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") @@ -82,8 +80,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { return; } - Set topics = kafkaAdmin.getAllTopics(); - if (topics == null || topics.isEmpty()) { + Set topics = kafkaAdmin.listTopics(); + if (topics.isEmpty()) { return; } diff --git a/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java b/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java index 933318ae59..25ff0f1b5d 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java @@ -33,7 +33,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; import org.thingsboard.server.dao.edge.stats.MsgCounters; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.service.edge.stats.EdgeStatsService; import java.util.List; @@ -141,7 +141,7 @@ public class EdgeStatsTest { TopicPartitionInfo partitionInfo = new TopicPartitionInfo(topic, tenantId, 0, false); when(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId)).thenReturn(partitionInfo); - TbKafkaAdmin kafkaAdmin = mock(TbKafkaAdmin.class); + KafkaAdmin kafkaAdmin = mock(KafkaAdmin.class); when(kafkaAdmin.getTotalLagForGroupsBulk(Set.of(topic))) .thenReturn(Map.of(topic, 15L)); diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 09bd02e5a4..bcbe52b5c9 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -507,7 +507,7 @@ public class TbRuleEngineQueueConsumerManagerTest { consumerManager.delete(true); - await().atMost(2, TimeUnit.SECONDS) + await().atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> { verify(ruleEngineMsgProducer).send(any(), any(), any()); }); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java index 9be50bb145..9210d4c1a8 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java @@ -16,7 +16,7 @@ package org.thingsboard.server.queue; public interface TbEdgeQueueAdmin extends TbQueueAdmin { + void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId); - void deleteConsumerGroup(String consumerGroupId); } diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java index 48d9b3c34f..0b9925765c 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java @@ -18,12 +18,13 @@ package org.thingsboard.server.queue; public interface TbQueueAdmin { default void createTopicIfNotExists(String topic) { - createTopicIfNotExists(topic, null); + createTopicIfNotExists(topic, null, false); } - void createTopicIfNotExists(String topic, String properties); + void createTopicIfNotExists(String topic, String properties, boolean force); void destroy(); void deleteTopic(String topic); + } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 7e2e99e662..b34abe5363 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -43,7 +43,7 @@ import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsExecutors; import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import java.util.HashMap; @@ -68,6 +68,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsExecutors edqsExecutors; private final EdqsMapper mapper; private final TopicService topicService; + private final KafkaAdmin kafkaAdmin; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -86,7 +87,6 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void init(PartitionedQueueConsumerManager> eventConsumer, List> otherConsumers) { versionsStore = new VersionsStore(config.getVersionsCacheTtl()); - TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin(); stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) .topic(topicService.buildTopicName(config.getStateTopic())) @@ -106,7 +106,7 @@ public class KafkaEdqsStateService implements EdqsStateService { consumer.commit(); }) .consumerCreator((config, tpi) -> queueFactory.createEdqsStateConsumer()) - .queueAdmin(queueAdmin) + .queueAdmin(queueFactory.getEdqsQueueAdmin()) .consumerExecutor(edqsExecutors.getConsumersExecutor()) .taskExecutor(edqsExecutors.getConsumerTaskExecutor()) .scheduler(edqsExecutors.getScheduler()) @@ -174,7 +174,7 @@ public class KafkaEdqsStateService implements EdqsStateService { // (because we need to be able to consume the same topic-partition by multiple instances) Map offsets = new HashMap<>(); try { - queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId()) + kafkaAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId()) .forEach((topicPartition, offsetAndMetadata) -> { offsets.put(topicPartition.topic(), offsetAndMetadata.offset()); }); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java index 6d78fa8b4f..ecb2a2b771 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java @@ -43,7 +43,7 @@ public class RuleEngineTbQueueAdminFactory { return new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic, String properties) { + public void createTopicIfNotExists(String topic, String properties, boolean force) { } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java new file mode 100644 index 0000000000..6261e81497 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java @@ -0,0 +1,285 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import jakarta.annotation.PreDestroy; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.ConcurrentException; +import org.apache.commons.lang3.concurrent.LazyInitializer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.CachedValue; +import org.thingsboard.server.queue.util.TbKafkaComponent; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +@TbKafkaComponent +@Component +@Slf4j +public class KafkaAdmin { + + /* + * TODO: Get rid of per consumer/producer TbKafkaAdmin, + * use single KafkaAdmin instance that accepts topicConfigs. + * */ + + private final TbKafkaSettings settings; + + @Value("${queue.kafka.request.timeout.ms:30000}") + private int requestTimeoutMs; + @Value("${queue.kafka.topics_cache_ttl_ms:300000}") // 5 minutes by default + private int topicsCacheTtlMs; + + private final LazyInitializer adminClient; + private final CachedValue> topics; + + public KafkaAdmin(@Lazy TbKafkaSettings settings) { + this.settings = settings; + this.adminClient = LazyInitializer.builder() + .setInitializer(() -> AdminClient.create(settings.toAdminProps())) + .get(); + this.topics = new CachedValue<>(() -> { + Set topics = ConcurrentHashMap.newKeySet(); + topics.addAll(listTopics()); + return topics; + }, topicsCacheTtlMs); + } + + public void createTopicIfNotExists(String topic, Map properties, boolean force) { + Set topics = getTopics(); + if (!force && topics.contains(topic)) { + log.trace("Topic {} already present in cache", topic); + return; + } + + log.debug("Creating topic {} with properties {}", topic, properties); + String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); + int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1; + NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties); + + try { + getClient().createTopics(List.of(newTopic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + topics.add(topic); + } catch (ExecutionException ee) { + log.trace("Failed to create topic {} with properties {}", topic, properties, ee); + if (ee.getCause() instanceof TopicExistsException) { + //do nothing + } else { + log.warn("[{}] Failed to create topic", topic, ee); + throw new RuntimeException(ee); + } + } catch (Exception e) { + log.warn("[{}] Failed to create topic", topic, e); + throw new RuntimeException(e); + } + } + + public void deleteTopic(String topic) { + log.debug("Deleting topic {}", topic); + try { + getClient().deleteTopics(List.of(topic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.error("Failed to delete kafka topic [{}].", topic, e); + } + } + + private Set getTopics() { + return topics.get(); + } + + public Set listTopics() { + try { + Set topics = getClient().listTopics().names().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + log.trace("Listed topics: {}", topics); + return topics; + } catch (Exception e) { + log.error("Failed to get all topics.", e); + return Collections.emptySet(); + } + } + + public Map getTotalLagForGroupsBulk(Set groupIds) { + Map result = new HashMap<>(); + for (String groupId : groupIds) { + result.put(groupId, getTotalConsumerGroupLag(groupId)); + } + return result; + } + + public long getTotalConsumerGroupLag(String groupId) { + try { + Map committedOffsets = getConsumerGroupOffsets(groupId); + if (committedOffsets.isEmpty()) { + return 0L; + } + + Map latestOffsetsSpec = committedOffsets.keySet().stream() + .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); + + Map endOffsets = + getClient().listOffsets(latestOffsetsSpec).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + + return committedOffsets.entrySet().stream() + .mapToLong(entry -> { + TopicPartition tp = entry.getKey(); + long committed = entry.getValue().offset(); + long end = endOffsets.getOrDefault(tp, + new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset(); + return end - committed; + }).sum(); + + } catch (Exception e) { + log.error("Failed to get total lag for consumer group: {}", groupId, e); + return 0L; + } + } + + @SneakyThrows + public Map getConsumerGroupOffsets(String groupId) { + return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } + + /** + * Sync offsets from a fat group to a single-partition group + * Migration back from single-partition consumer to a fat group is not supported + * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers + * */ + public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { + try { + log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); + if (partitionId == null) { + return; + } + syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId); + } catch (Exception e) { + log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); + } + } + + public void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException { + Map oldOffsets = getConsumerGroupOffsets(fatGroupId); + if (oldOffsets.isEmpty()) { + return; + } + + for (var consumerOffset : oldOffsets.entrySet()) { + var tp = consumerOffset.getKey(); + if (!tp.topic().endsWith(topicSuffix)) { + continue; + } + var om = consumerOffset.getValue(); + Map newOffsets = getConsumerGroupOffsets(newGroupId); + + var existingOffset = newOffsets.get(tp); + if (existingOffset == null) { + log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets); + } else if (existingOffset.offset() >= om.offset()) { + log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset()); + break; + } else { + log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); + } + getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + log.info("[{}] altered new consumer groupId {}", tp, newGroupId); + break; + } + } + + public boolean isTopicEmpty(String topic) { + return areAllTopicsEmpty(Set.of(topic)); + } + + public boolean areAllTopicsEmpty(Set topics) { + try { + List existingTopics = getTopics().stream().filter(topics::contains).toList(); + if (existingTopics.isEmpty()) { + return true; + } + + List allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(requestTimeoutMs, TimeUnit.MILLISECONDS) + .entrySet().stream() + .flatMap(entry -> { + String topic = entry.getKey(); + TopicDescription topicDescription = entry.getValue(); + return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); + }) + .toList(); + + Map beginningOffsets = getClient().listOffsets(allPartitions.stream() + .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + Map endOffsets = getClient().listOffsets(allPartitions.stream() + .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + + for (TopicPartition partition : allPartitions) { + long beginningOffset = beginningOffsets.get(partition).offset(); + long endOffset = endOffsets.get(partition).offset(); + + if (beginningOffset != endOffset) { + log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic()); + return false; + } + } + return true; + } catch (Exception e) { + log.error("Failed to check if topics [{}] empty.", topics, e); + return false; + } + } + + public void deleteConsumerGroup(String consumerGroupId) { + try { + getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.warn("Failed to delete consumer group {}", consumerGroupId, e); + } + } + + public AdminClient getClient() { + try { + return adminClient.get(); + } catch (ConcurrentException e) { + throw new RuntimeException("Failed to initialize Kafka admin client", e); + } + } + + @PreDestroy + private void destroy() throws Exception { + if (adminClient.isInitialized()) { + adminClient.get().close(); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 47a7ef13e3..65e9d5e4c4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -16,31 +16,12 @@ package org.thingsboard.server.queue.kafka; import lombok.Getter; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TopicExistsException; import org.thingsboard.server.queue.TbEdgeQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; /** * Created by ashvayka on 24.09.18. @@ -52,251 +33,42 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin { private final Map topicConfigs; @Getter private final int numPartitions; - private volatile Set topics; - - private final short replicationFactor; public TbKafkaAdmin(TbKafkaSettings settings, Map topicConfigs) { this.settings = settings; this.topicConfigs = topicConfigs; - String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); if (numPartitionsStr != null) { numPartitions = Integer.parseInt(numPartitionsStr); } else { numPartitions = 1; } - replicationFactor = settings.getReplicationFactor(); } @Override - public void createTopicIfNotExists(String topic, String properties) { - Set topics = getTopics(); - if (topics.contains(topic)) { - return; - } - try { - Map configs = PropertyUtils.getProps(topicConfigs, properties); - configs.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); - NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(configs); - createTopic(newTopic).values().get(topic).get(); - topics.add(topic); - } catch (ExecutionException ee) { - if (ee.getCause() instanceof TopicExistsException) { - //do nothing - } else { - log.warn("[{}] Failed to create topic", topic, ee); - throw new RuntimeException(ee); - } - } catch (Exception e) { - log.warn("[{}] Failed to create topic", topic, e); - throw new RuntimeException(e); - } + public void createTopicIfNotExists(String topic, String properties, boolean force) { + settings.getAdmin().createTopicIfNotExists(topic, PropertyUtils.getProps(topicConfigs, properties), force); } @Override public void deleteTopic(String topic) { - Set topics = getTopics(); - if (topics.remove(topic)) { - settings.getAdminClient().deleteTopics(Collections.singletonList(topic)); - } else { - try { - if (settings.getAdminClient().listTopics().names().get().contains(topic)) { - settings.getAdminClient().deleteTopics(Collections.singletonList(topic)); - } else { - log.warn("Kafka topic [{}] does not exist.", topic); - } - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to delete kafka topic [{}].", topic, e); - } - } - } - - private Set getTopics() { - if (topics == null) { - synchronized (this) { - if (topics == null) { - topics = ConcurrentHashMap.newKeySet(); - try { - topics.addAll(settings.getAdminClient().listTopics().names().get()); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to get all topics.", e); - } - } - } - } - return topics; - } - - public Set getAllTopics() { - try { - return settings.getAdminClient().listTopics().names().get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to get all topics.", e); - } - return null; - } - - public CreateTopicsResult createTopic(NewTopic topic) { - return settings.getAdminClient().createTopics(Collections.singletonList(topic)); + settings.getAdmin().deleteTopic(topic); } @Override public void destroy() { } - /** - * Sync offsets from a fat group to a single-partition group - * Migration back from single-partition consumer to a fat group is not supported - * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers - * */ - public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { - try { - log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); - if (partitionId == null) { - return; - } - syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId); - } catch (Exception e) { - log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); - } - } - /** * Sync edge notifications offsets from a fat group to a single group per edge * */ public void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId) { try { log.info("syncEdgeNotificationsOffsets [{}][{}]", fatGroupId, newGroupId); - syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId); + settings.getAdmin().syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId); } catch (Exception e) { log.warn("Failed to syncEdgeNotificationsOffsets from {} to {}", fatGroupId, newGroupId, e); } } - @Override - public void deleteConsumerGroup(String consumerGroupId) { - try { - settings.getAdminClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId)); - } catch (Exception e) { - log.warn("Failed to delete consumer group {}", consumerGroupId, e); - } - } - - void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException { - Map oldOffsets = getConsumerGroupOffsets(fatGroupId); - if (oldOffsets.isEmpty()) { - return; - } - - for (var consumerOffset : oldOffsets.entrySet()) { - var tp = consumerOffset.getKey(); - if (!tp.topic().endsWith(topicSuffix)) { - continue; - } - var om = consumerOffset.getValue(); - Map newOffsets = getConsumerGroupOffsets(newGroupId); - - var existingOffset = newOffsets.get(tp); - if (existingOffset == null) { - log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets); - } else if (existingOffset.offset() >= om.offset()) { - log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset()); - break; - } else { - log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); - } - settings.getAdminClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS); - log.info("[{}] altered new consumer groupId {}", tp, newGroupId); - break; - } - } - - @SneakyThrows - public Map getConsumerGroupOffsets(String groupId) { - return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); - } - - public boolean isTopicEmpty(String topic) { - return areAllTopicsEmpty(Set.of(topic)); - } - - public boolean areAllTopicsEmpty(Set topics) { - try { - List existingTopics = getTopics().stream().filter(topics::contains).toList(); - if (existingTopics.isEmpty()) { - return true; - } - - List allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream() - .flatMap(entry -> { - String topic = entry.getKey(); - TopicDescription topicDescription; - try { - topicDescription = entry.getValue().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); - }) - .toList(); - - Map beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() - .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(); - Map endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() - .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(); - - for (TopicPartition partition : allPartitions) { - long beginningOffset = beginningOffsets.get(partition).offset(); - long endOffset = endOffsets.get(partition).offset(); - - if (beginningOffset != endOffset) { - log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic()); - return false; - } - } - return true; - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to check if topics [{}] empty.", topics, e); - return false; - } - } - - public Map getTotalLagForGroupsBulk(Set groupIds) { - Map result = new HashMap<>(); - for (String groupId : groupIds) { - result.put(groupId, getTotalConsumerGroupLag(groupId)); - } - return result; - } - - public long getTotalConsumerGroupLag(String groupId) { - try { - Map committedOffsets = getConsumerGroupOffsets(groupId); - if (committedOffsets.isEmpty()) { - return 0L; - } - - Map latestOffsetsSpec = committedOffsets.keySet().stream() - .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); - - Map endOffsets = - settings.getAdminClient().listOffsets(latestOffsetsSpec) - .all().get(10, TimeUnit.SECONDS); - - return committedOffsets.entrySet().stream() - .mapToLong(entry -> { - TopicPartition tp = entry.getKey(); - long committed = entry.getValue().offset(); - long end = endOffsets.getOrDefault(tp, - new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset(); - return end - committed; - }).sum(); - - } catch (Exception e) { - log.error("Failed to get total lag for consumer group: {}", groupId, e); - return 0L; - } - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java index d44f9ee700..12a3ea8fa6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java @@ -19,11 +19,11 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import org.thingsboard.server.queue.util.TbKafkaComponent; @Component -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent @Getter @AllArgsConstructor @NoArgsConstructor diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java index 7a9c01b72f..e1cfb995c8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java @@ -26,10 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.queue.util.TbKafkaComponent; import java.time.Duration; import java.util.ArrayList; @@ -44,11 +44,12 @@ import java.util.concurrent.TimeUnit; @Slf4j @Component @RequiredArgsConstructor -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent public class TbKafkaConsumerStatsService { private final Set monitoredGroups = ConcurrentHashMap.newKeySet(); private final TbKafkaSettings kafkaSettings; + private final KafkaAdmin kafkaAdmin; private final TbKafkaConsumerStatisticConfig statsConfig; private Consumer consumer; @@ -77,7 +78,7 @@ public class TbKafkaConsumerStatsService { } for (String groupId : monitoredGroups) { try { - Map groupOffsets = kafkaSettings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata() + Map groupOffsets = kafkaSettings.getAdmin().getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata() .get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS); Map endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration); @@ -159,12 +160,14 @@ public class TbKafkaConsumerStatsService { @Override public String toString() { return "[" + - "topic=[" + topic + ']' + - ", partition=[" + partition + "]" + - ", committedOffset=[" + committedOffset + "]" + - ", endOffset=[" + endOffset + "]" + - ", lag=[" + lag + "]" + - "]"; + "topic=[" + topic + ']' + + ", partition=[" + partition + "]" + + ", committedOffset=[" + committedOffset + "]" + + ", endOffset=[" + endOffset + "]" + + ", lag=[" + lag + "]" + + "]"; } + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 06ccfe3a69..11736f68cf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -16,12 +16,10 @@ package org.thingsboard.server.queue.kafka; import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -30,12 +28,13 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.TbProperty; import org.thingsboard.server.queue.util.PropertyUtils; +import org.thingsboard.server.queue.util.TbKafkaComponent; import java.util.HashMap; import java.util.LinkedHashMap; @@ -47,7 +46,7 @@ import java.util.Properties; * Created by ashvayka on 25.09.18. */ @Slf4j -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent @ConfigurationProperties(prefix = "queue.kafka") @Component public class TbKafkaSettings { @@ -143,6 +142,9 @@ public class TbKafkaSettings { @Value("${queue.kafka.consumer-properties-per-topic-inline:}") private String consumerPropertiesPerTopicInline; + @Autowired + private KafkaAdmin kafkaAdmin; + @Deprecated @Setter private List other; @@ -150,8 +152,6 @@ public class TbKafkaSettings { @Setter private Map> consumerPropertiesPerTopic = new HashMap<>(); - private volatile AdminClient adminClient; - @PostConstruct public void initInlineTopicProperties() { Map> inlineProps = parseTopicPropertyList(consumerPropertiesPerTopicInline); @@ -240,15 +240,12 @@ public class TbKafkaSettings { } } - public AdminClient getAdminClient() { - if (adminClient == null) { - synchronized (this) { - if (adminClient == null) { - adminClient = AdminClient.create(toAdminProps()); - } - } - } - return adminClient; + /* + * Temporary solution to avoid major code changes. + * FIXME: use single instance of Kafka queue admin, don't create a separate one for each consumer/producer + * */ + public KafkaAdmin getAdmin() { + return kafkaAdmin; } protected Properties toAdminProps() { @@ -279,11 +276,4 @@ public class TbKafkaSettings { return result; } - @PreDestroy - private void destroy() { - if (adminClient != null) { - adminClient.close(); - } - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index 5d5834d20a..c50fd0d720 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -18,14 +18,14 @@ package org.thingsboard.server.queue.kafka; import jakarta.annotation.PostConstruct; import lombok.Getter; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.queue.util.PropertyUtils; +import org.thingsboard.server.queue.util.TbKafkaComponent; import java.util.Map; @Component -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent public class TbKafkaTopicConfigs { public static final String NUM_PARTITIONS_SETTING = "partitions"; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index 75d53d9830..2c932534b1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -78,7 +78,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory templateBuilder.queueAdmin(new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic, String properties) {} + public void createTopicIfNotExists(String topic, String properties, boolean force) {} @Override public void destroy() {} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 866f8d235e..245330cc3e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -58,6 +58,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -83,6 +84,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TopicService topicService; private final TbKafkaSettings kafkaSettings; + private final KafkaAdmin kafkaAdmin; private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueCoreSettings coreSettings; private final TbQueueRuleEngineSettings ruleEngineSettings; @@ -118,7 +120,9 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final AtomicLong consumerCount = new AtomicLong(); private final AtomicLong edgeConsumerCount = new AtomicLong(); - public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, + public KafkaMonolithQueueFactory(TopicService topicService, + TbKafkaSettings kafkaSettings, + KafkaAdmin kafkaAdmin, TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, @@ -134,6 +138,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TasksQueueConfig tasksQueueConfig) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; + this.kafkaAdmin = kafkaAdmin; this.serviceInfoProvider = serviceInfoProvider; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; @@ -240,7 +245,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi String queueName = configuration.getName(); String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); - ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 3b67ea4f9f..dbf4ab1aba 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -52,6 +52,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -74,6 +75,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TopicService topicService; private final TbKafkaSettings kafkaSettings; + private final KafkaAdmin kafkaAdmin; private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueCoreSettings coreSettings; private final TbQueueRuleEngineSettings ruleEngineSettings; @@ -99,6 +101,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final AtomicLong consumerCount = new AtomicLong(); public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, + KafkaAdmin kafkaAdmin, TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, @@ -111,6 +114,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; + this.kafkaAdmin = kafkaAdmin; this.serviceInfoProvider = serviceInfoProvider; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; @@ -234,7 +238,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { String queueName = configuration.getName(); String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); - ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/TbKafkaComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbKafkaComponent.java new file mode 100644 index 0000000000..ad4862e36d --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbKafkaComponent.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.util; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; + +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({java.lang.annotation.ElementType.TYPE, java.lang.annotation.ElementType.METHOD}) +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +public @interface TbKafkaComponent {} diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java index ad026c63aa..bc37982245 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java @@ -28,7 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.spy; -@SpringBootTest(classes = TbKafkaSettings.class) +@SpringBootTest(classes = {TbKafkaSettings.class, KafkaAdmin.class}) @TestPropertySource(properties = { "queue.type=kafka", "queue.kafka.bootstrap.servers=localhost:9092", diff --git a/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java new file mode 100644 index 0000000000..b0a41c2a42 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class CachedValue { + + private final LoadingCache cache; + + public CachedValue(Supplier supplier, long valueTtlMs) { + this.cache = Caffeine.newBuilder() + .expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS) + .build(__ -> supplier.get()); + } + + public V get() { + return cache.get(this); + } + +}