From 01e72f4e30485319b25c51c08528006b233443dc Mon Sep 17 00:00:00 2001 From: VIacheslavKlimov Date: Wed, 30 Jul 2025 17:03:49 +0300 Subject: [PATCH] Fix topics creation for isolated tenants --- .../service/entitiy/queue/DefaultTbQueueService.java | 4 ++-- .../org/thingsboard/server/queue/TbQueueAdmin.java | 5 +++-- .../server/queue/RuleEngineTbQueueAdminFactory.java | 2 +- .../thingsboard/server/queue/kafka/TbKafkaAdmin.java | 10 ++++++---- .../provider/InMemoryTbTransportQueueFactory.java | 2 +- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index 0d4cc26ce7..26f0d81182 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -176,8 +176,8 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb for (int i = oldPartitions; i < newPartitions; i++) { tbQueueAdmin.createTopicIfNotExists( new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), - queue.getCustomProperties() - ); + queue.getCustomProperties(), + true); // forcing topic creation because the topic may still be cached on some nodes } } diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java index 48d9b3c34f..0b9925765c 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java @@ -18,12 +18,13 @@ package org.thingsboard.server.queue; public interface TbQueueAdmin { default void createTopicIfNotExists(String topic) { - createTopicIfNotExists(topic, null); + createTopicIfNotExists(topic, null, false); } - void createTopicIfNotExists(String topic, String properties); + void createTopicIfNotExists(String topic, String properties, boolean force); void destroy(); void deleteTopic(String topic); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java index 6d78fa8b4f..ecb2a2b771 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java @@ -43,7 +43,7 @@ public class RuleEngineTbQueueAdminFactory { return new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic, String properties) { + public void createTopicIfNotExists(String topic, String properties, boolean force) { } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 47a7ef13e3..1c2abe53f8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -70,10 +70,12 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin { } @Override - public void createTopicIfNotExists(String topic, String properties) { - Set topics = getTopics(); - if (topics.contains(topic)) { - return; + public void createTopicIfNotExists(String topic, String properties, boolean force) { + if (!force) { + Set topics = getTopics(); + if (topics.contains(topic)) { + return; + } } try { Map configs = PropertyUtils.getProps(topicConfigs, properties); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index 75d53d9830..2c932534b1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -78,7 +78,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory templateBuilder.queueAdmin(new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic, String properties) {} + public void createTopicIfNotExists(String topic, String properties, boolean force) {} @Override public void destroy() {}