From 9ddd22edcf9e5e73cc7a527c0fbdfe6e367291b1 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 20 Mar 2025 16:12:42 +0200 Subject: [PATCH] EDQS: don't use Kafka manual partitions assignment --- .../server/service/edqs/EdqsSyncService.java | 3 ++ .../service/edqs/KafkaEdqsSyncService.java | 10 +++++- .../src/main/resources/thingsboard.yml | 12 +++---- .../server/edqs/processor/EdqsProcessor.java | 7 ++-- .../server/edqs/processor/EdqsProducer.java | 12 +++---- .../edqs/state/KafkaEdqsStateService.java | 10 +++++- .../common/DefaultTbQueueRequestTemplate.java | 1 - .../server/queue/kafka/TbKafkaAdmin.java | 33 ++++++++++++++----- edqs/src/main/resources/edqs.yml | 12 +++---- 9 files changed, 63 insertions(+), 37 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java index 79e0e60983..6ea2f959b7 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java @@ -43,6 +43,7 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sql.relation.RelationRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; +import org.thingsboard.server.queue.edqs.EdqsConfig; import java.util.List; import java.util.Map; @@ -75,6 +76,8 @@ public abstract class EdqsSyncService { @Autowired @Lazy private DefaultEdqsService edqsService; + @Autowired + protected EdqsConfig edqsConfig; private final ConcurrentHashMap entityInfoMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap keys = new ConcurrentHashMap<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 4ef552521b..201964c955 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -17,11 +17,14 @@ package org.thingsboard.server.service.edqs; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Service @ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'") @@ -31,7 +34,12 @@ public class KafkaEdqsSyncService extends EdqsSyncService { public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); - this.syncNeeded = kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic()); + this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) + .mapToObj(partition -> TopicPartitionInfo.builder() + .topic(EdqsQueue.EVENTS.getTopic()) + .partition(partition) + .build().getFullTopicName()) + .collect(Collectors.toSet())); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c61a993d56..eac787d52b 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1645,12 +1645,12 @@ queue: calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for Calculated Field State topics calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" - # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions - edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + # Kafka properties for EDQS events topics + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS requests topic (default: 3 minutes retention) + edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS state topic (infinite retention, compaction) + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 7ddc9147df..f2360617ee 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -164,13 +164,10 @@ public class EdqsProcessor implements TbQueueHandler, } try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - Set partitions = newPartitions.stream() - .map(tpi -> tpi.withUseInternalPartition(true)) - .collect(Collectors.toSet()); - stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic())); + stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic())); // eventsConsumer's partitions are updated by stateService - responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); // FIXME: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template + responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); if (CollectionsUtil.isNotEmpty(oldPartitions)) { diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index be1f0481be..970ee76381 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -70,17 +70,13 @@ public class EdqsProducer { log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t); } }; + TopicPartitionInfo tpi = TopicPartitionInfo.builder() + .topic(topic) + .partition(partitionService.resolvePartition(tenantId)) + .build(); if (producer instanceof TbKafkaProducerTemplate> kafkaProducer) { - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .partition(partitionService.resolvePartition(tenantId)) - .useInternalPartition(true) - .build(); kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction } else { - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .build(); producer.send(tpi, new TbProtoQueueMsg<>(null, msg), callback); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 85b8e92387..067c730bfe 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -45,6 +45,8 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Service @RequiredArgsConstructor @@ -151,7 +153,13 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void process(Set partitions) { if (queueStateService.getPartitions().isEmpty()) { - eventsToBackupConsumer.subscribe(); + Set allPartitions = IntStream.range(0, config.getPartitions()) + .mapToObj(partition -> TopicPartitionInfo.builder() + .topic(EdqsQueue.EVENTS.getTopic()) + .partition(partition) + .build()) + .collect(Collectors.toSet()); + eventsToBackupConsumer.subscribe(allPartitions); eventsToBackupConsumer.launch(); } queueStateService.update(new QueueKey(ServiceType.EDQS), partitions); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index 4efb297491..1beb505595 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -263,7 +263,6 @@ public class DefaultTbQueueRequestTemplate topics) { try { - if (!getTopics().contains(topic)) { + List existingTopics = getTopics().stream().filter(topics::contains).toList(); + if (existingTopics.isEmpty()) { return true; } - TopicDescription topicDescription = settings.getAdminClient().describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic).get(); - List partitions = topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).toList(); - Map beginningOffsets = settings.getAdminClient().listOffsets(partitions.stream() + List allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream() + .flatMap(entry -> { + String topic = entry.getKey(); + TopicDescription topicDescription; + try { + topicDescription = entry.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); + }) + .toList(); + + Map beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(); - - Map endOffsets = settings.getAdminClient().listOffsets(partitions.stream() + Map endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(); - for (TopicPartition partition : partitions) { + for (TopicPartition partition : allPartitions) { long beginningOffset = beginningOffsets.get(partition).offset(); long endOffset = endOffsets.get(partition).offset(); if (beginningOffset != endOffset) { - log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), topic); + log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic()); return false; } } return true; } catch (InterruptedException | ExecutionException e) { - log.error("Failed to check if topic [{}] is empty.", topic, e); + log.error("Failed to check if topics [{}] empty.", topics, e); return false; } } diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index f7d0eda841..c101eff68e 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -148,12 +148,12 @@ queue: # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: - # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions - edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + # Kafka properties for EDQS events topics + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS requests topic (default: 3 minutes retention) + edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS state topic (infinite retention, compaction) + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"