From ef65dd90263f982ff032ca835b91bd8d4e48e6a7 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 13 May 2025 16:41:57 +0300 Subject: [PATCH] Fix missing queue prefixes --- .../server/service/edqs/KafkaEdqsSyncService.java | 5 +++-- .../thingsboard/server/edqs/processor/EdqsProcessor.java | 8 +++++--- .../server/edqs/state/KafkaEdqsStateService.java | 6 ++++-- .../server/queue/discovery/HashPartitionService.java | 2 +- .../org/thingsboard/server/queue/kafka/TbKafkaAdmin.java | 3 +++ 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 239fd9dc42..43b0c575a0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edqs; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -32,11 +33,11 @@ public class KafkaEdqsSyncService extends EdqsSyncService { private final boolean syncNeeded; - public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, EdqsConfig edqsConfig) { + public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, TopicService topicService, EdqsConfig edqsConfig) { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() - .topic(edqsConfig.getEventsTopic()) + .topic(topicService.buildTopicName(edqsConfig.getEventsTopic())) .partition(partition) .build().getFullTopicName()) .collect(Collectors.toSet())); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 78d17ef368..72919c3fc0 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -58,6 +58,7 @@ import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.edqs.EdqsComponent; import org.thingsboard.server.queue.edqs.EdqsConfig; @@ -88,6 +89,7 @@ public class EdqsProcessor implements TbQueueHandler, private final EdqsRepository repository; private final EdqsConfig config; private final EdqsPartitionService partitionService; + private final TopicService topicService; private final ConfigurableApplicationContext applicationContext; private final EdqsStateService stateService; @@ -123,7 +125,7 @@ public class EdqsProcessor implements TbQueueHandler, eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic())) - .topic(config.getEventsTopic()) + .topic(topicService.buildTopicName(config.getEventsTopic())) .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { @@ -164,9 +166,9 @@ public class EdqsProcessor implements TbQueueHandler, try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - stateService.process(withTopic(newPartitions, config.getStateTopic())); + stateService.process(withTopic(newPartitions, topicService.buildTopicName(config.getStateTopic()))); // eventsConsumer's partitions are updated by stateService - responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template + responseTemplate.subscribe(withTopic(newPartitions, topicService.buildTopicName(config.getRequestsTopic()))); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); if (CollectionsUtil.isNotEmpty(oldPartitions)) { diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 0efe6e7d3b..ddbdc3253a 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.common.state.KafkaQueueStateService; import org.thingsboard.server.queue.common.state.QueueStateService; import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory; @@ -59,6 +60,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; private final KafkaEdqsQueueFactory queueFactory; + private final TopicService topicService; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -78,7 +80,7 @@ public class KafkaEdqsStateService implements EdqsStateService { TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin(); stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) - .topic(config.getStateTopic()) + .topic(topicService.buildTopicName(config.getStateTopic())) .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { @@ -176,7 +178,7 @@ public class KafkaEdqsStateService implements EdqsStateService { if (queueStateService.getPartitions().isEmpty()) { Set allPartitions = IntStream.range(0, config.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() - .topic(config.getEventsTopic()) + .topic(topicService.buildTopicName(config.getEventsTopic())) .partition(partition) .build()) .collect(Collectors.toSet()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 7186bf7055..ec5c675e32 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -156,7 +156,7 @@ public class HashPartitionService implements PartitionService { @Override public String getTopic(QueueKey queueKey) { - return partitionTopicsMap.get(queueKey); + return topicService.buildTopicName(partitionTopicsMap.get(queueKey)); } private void doInitRuleEnginePartitions() { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 1e0064a5c8..6addce2865 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -136,6 +136,9 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin { } public CreateTopicsResult createTopic(NewTopic topic) { + if (!topic.name().startsWith("test.")) { // FIXME: remove me + log.error("Creating topic without configured prefix: {}", topic.name(), new RuntimeException("stacktrace")); + } return settings.getAdminClient().createTopics(Collections.singletonList(topic)); }