From 1ee3c24532f5ee6b3183ae8c6bc8c69373feaaff Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 19 Jul 2023 17:37:49 +0300 Subject: [PATCH] Refactor HashPartitionService.resolve(..) --- .../queue/discovery/HashPartitionService.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index e2312fa7cd..65f750f830 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -48,6 +48,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; + @Service @Slf4j public class HashPartitionService implements PartitionService { @@ -186,8 +188,15 @@ public class HashPartitionService implements PartitionService { TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId); QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId); if (!partitionSizesMap.containsKey(queueKey)) { - // TODO: fallback to Main in case no system queue - queueKey = new QueueKey(serviceType, queueName, TenantId.SYS_TENANT_ID); + if (isolatedOrSystemTenantId.isSysTenantId()) { + queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID); + } else { + queueKey = new QueueKey(serviceType, queueName, TenantId.SYS_TENANT_ID); + if (!MAIN_QUEUE_NAME.equals(queueName) && !partitionSizesMap.containsKey(queueKey)) { + queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID); + } + log.warn("Using queue {} instead of isolated {}", queueKey, queueName); + } } return resolve(queueKey, entityId); } @@ -208,9 +217,9 @@ public class HashPartitionService implements PartitionService { .putLong(entityId.getId().getLeastSignificantBits()).hash().asInt(); Integer partitionSize = partitionSizesMap.get(queueKey); - // if (partitionSize == null) { -// throw new IllegalStateException("Can't get partition ") -// } + if (partitionSize == null) { + throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing"); + } int partition = Math.abs(hash % partitionSize); return buildTopicPartitionInfo(queueKey, partition);