diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 8a56a86bcc..3317e75c91 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -90,20 +90,37 @@ public class HashPartitionService implements PartitionService { @PostConstruct public void init() { this.hashFunction = forName(hashFunctionName); - } - - @AfterStartUp(order = AfterStartUp.QUEUE_INFO_INITIALIZATION) - public void partitionsInit() { QueueKey coreKey = new QueueKey(ServiceType.TB_CORE); partitionSizesMap.put(coreKey, corePartitions); partitionTopicsMap.put(coreKey, coreTopic); - List queueRoutingInfoList; + if (!isTransport(serviceInfoProvider.getServiceType())) { + doInitRuleEnginePartitions(); + } + } + @AfterStartUp(order = AfterStartUp.QUEUE_INFO_INITIALIZATION) + public void partitionsInit() { + if (isTransport(serviceInfoProvider.getServiceType())) { + doInitRuleEnginePartitions(); + } + } + + private void doInitRuleEnginePartitions() { + List queueRoutingInfoList = getQueueRoutingInfos(); + queueRoutingInfoList.forEach(queue -> { + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); + partitionTopicsMap.put(queueKey, queue.getQueueTopic()); + partitionSizesMap.put(queueKey, queue.getPartitions()); + queuesById.put(queue.getQueueId(), queue); + }); + } + + private List getQueueRoutingInfos() { + List queueRoutingInfoList; String serviceType = serviceInfoProvider.getServiceType(); - - if ("tb-transport".equals(serviceType)) { + if (isTransport(serviceType)) { //If transport started earlier than tb-core int getQueuesRetries = 10; while (true) { @@ -128,13 +145,11 @@ public class HashPartitionService implements PartitionService { } else { queueRoutingInfoList = queueRoutingInfoService.getAllQueuesRoutingInfo(); } + return queueRoutingInfoList; + } - queueRoutingInfoList.forEach(queue -> { - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); - partitionTopicsMap.put(queueKey, queue.getQueueTopic()); - partitionSizesMap.put(queueKey, queue.getPartitions()); - queuesById.put(queue.getQueueId(), queue); - }); + private boolean isTransport(String serviceType) { + return "tb-transport".equals(serviceType); } @Override