diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index f0635250be..ad55334030 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -103,11 +103,11 @@ public class HashPartitionService implements PartitionService { this.hashFunction = forName(hashFunctionName); QueueKey coreKey = new QueueKey(ServiceType.TB_CORE); partitionSizesMap.put(coreKey, corePartitions); - partitionTopicsMap.put(coreKey, topicService.buildTopicName(coreTopic)); + partitionTopicsMap.put(coreKey, coreTopic); QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR); partitionSizesMap.put(vcKey, vcPartitions); - partitionTopicsMap.put(vcKey, topicService.buildTopicName(vcTopic)); + partitionTopicsMap.put(vcKey, vcTopic); if (!isTransport(serviceInfoProvider.getServiceType())) { doInitRuleEnginePartitions(); @@ -125,7 +125,7 @@ public class HashPartitionService implements PartitionService { List queueRoutingInfoList = getQueueRoutingInfos(); queueRoutingInfoList.forEach(queue -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); - partitionTopicsMap.put(queueKey, topicService.buildTopicName(queue.getQueueTopic())); + partitionTopicsMap.put(queueKey, queue.getQueueTopic()); partitionSizesMap.put(queueKey, queue.getPartitions()); }); } @@ -423,7 +423,7 @@ public class HashPartitionService implements PartitionService { private TopicPartitionInfo buildTopicPartitionInfo(QueueKey queueKey, int partition) { TopicPartitionInfo.TopicPartitionInfoBuilder tpi = TopicPartitionInfo.builder(); - tpi.topic(partitionTopicsMap.get(queueKey)); + tpi.topic(topicService.buildTopicName(partitionTopicsMap.get(queueKey))); tpi.partition(partition); tpi.tenantId(queueKey.getTenantId()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index bab0afbb23..40e19519b0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -27,7 +27,7 @@ import java.util.Map; @Service public class TopicService { - @Value("${queue.prefix}") + @Value("${queue.prefix:}") private String prefix; private Map tbCoreNotificationTopics = new HashMap<>(); @@ -57,8 +57,8 @@ public class TopicService { return buildTopicPartitionInfo(serviceType.name().toLowerCase() + ".notifications." + serviceId, null, null, false); } - public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartiotion) { - return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartiotion); + public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) { + return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition); } public String buildTopicName(String topic) {