Fix topics creation for isolated tenants
This commit is contained in:
		
							parent
							
								
									85ae3ed778
								
							
						
					
					
						commit
						01e72f4e30
					
				@ -176,8 +176,8 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
 | 
				
			|||||||
        for (int i = oldPartitions; i < newPartitions; i++) {
 | 
					        for (int i = oldPartitions; i < newPartitions; i++) {
 | 
				
			||||||
            tbQueueAdmin.createTopicIfNotExists(
 | 
					            tbQueueAdmin.createTopicIfNotExists(
 | 
				
			||||||
                    new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
 | 
					                    new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
 | 
				
			||||||
                    queue.getCustomProperties()
 | 
					                    queue.getCustomProperties(),
 | 
				
			||||||
            );
 | 
					                    true); // forcing topic creation because the topic may still be cached on some nodes
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -18,12 +18,13 @@ package org.thingsboard.server.queue;
 | 
				
			|||||||
public interface TbQueueAdmin {
 | 
					public interface TbQueueAdmin {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    default void createTopicIfNotExists(String topic) {
 | 
					    default void createTopicIfNotExists(String topic) {
 | 
				
			||||||
        createTopicIfNotExists(topic, null);
 | 
					        createTopicIfNotExists(topic, null, false);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void createTopicIfNotExists(String topic, String properties);
 | 
					    void createTopicIfNotExists(String topic, String properties, boolean force);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void destroy();
 | 
					    void destroy();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void deleteTopic(String topic);
 | 
					    void deleteTopic(String topic);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -43,7 +43,7 @@ public class RuleEngineTbQueueAdminFactory {
 | 
				
			|||||||
        return new TbQueueAdmin() {
 | 
					        return new TbQueueAdmin() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            @Override
 | 
					            @Override
 | 
				
			||||||
            public void createTopicIfNotExists(String topic, String properties) {
 | 
					            public void createTopicIfNotExists(String topic, String properties, boolean force) {
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            @Override
 | 
					            @Override
 | 
				
			||||||
 | 
				
			|||||||
@ -70,11 +70,13 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void createTopicIfNotExists(String topic, String properties) {
 | 
					    public void createTopicIfNotExists(String topic, String properties, boolean force) {
 | 
				
			||||||
 | 
					        if (!force) {
 | 
				
			||||||
            Set<String> topics = getTopics();
 | 
					            Set<String> topics = getTopics();
 | 
				
			||||||
            if (topics.contains(topic)) {
 | 
					            if (topics.contains(topic)) {
 | 
				
			||||||
                return;
 | 
					                return;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
            Map<String, String> configs = PropertyUtils.getProps(topicConfigs, properties);
 | 
					            Map<String, String> configs = PropertyUtils.getProps(topicConfigs, properties);
 | 
				
			||||||
            configs.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
 | 
					            configs.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
 | 
				
			||||||
 | 
				
			|||||||
@ -78,7 +78,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        templateBuilder.queueAdmin(new TbQueueAdmin() {
 | 
					        templateBuilder.queueAdmin(new TbQueueAdmin() {
 | 
				
			||||||
            @Override
 | 
					            @Override
 | 
				
			||||||
            public void createTopicIfNotExists(String topic, String properties) {}
 | 
					            public void createTopicIfNotExists(String topic, String properties, boolean force) {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            @Override
 | 
					            @Override
 | 
				
			||||||
            public void destroy() {}
 | 
					            public void destroy() {}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user