Refactor HashPartitionService.resolve(..)

This commit is contained in:
ViacheslavKlimov 2023-07-19 17:37:49 +03:00
parent bfd8ff934f
commit 1ee3c24532

View File

@ -48,6 +48,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
@Service @Service
@Slf4j @Slf4j
public class HashPartitionService implements PartitionService { public class HashPartitionService implements PartitionService {
@ -186,8 +188,15 @@ public class HashPartitionService implements PartitionService {
TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId); TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId);
QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId); QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId);
if (!partitionSizesMap.containsKey(queueKey)) { if (!partitionSizesMap.containsKey(queueKey)) {
// TODO: fallback to Main in case no system queue if (isolatedOrSystemTenantId.isSysTenantId()) {
queueKey = new QueueKey(serviceType, queueName, TenantId.SYS_TENANT_ID); queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID);
} else {
queueKey = new QueueKey(serviceType, queueName, TenantId.SYS_TENANT_ID);
if (!MAIN_QUEUE_NAME.equals(queueName) && !partitionSizesMap.containsKey(queueKey)) {
queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID);
}
log.warn("Using queue {} instead of isolated {}", queueKey, queueName);
}
} }
return resolve(queueKey, entityId); return resolve(queueKey, entityId);
} }
@ -208,9 +217,9 @@ public class HashPartitionService implements PartitionService {
.putLong(entityId.getId().getLeastSignificantBits()).hash().asInt(); .putLong(entityId.getId().getLeastSignificantBits()).hash().asInt();
Integer partitionSize = partitionSizesMap.get(queueKey); Integer partitionSize = partitionSizesMap.get(queueKey);
// if (partitionSize == null) { if (partitionSize == null) {
// throw new IllegalStateException("Can't get partition ") throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing");
// } }
int partition = Math.abs(hash % partitionSize); int partition = Math.abs(hash % partitionSize);
return buildTopicPartitionInfo(queueKey, partition); return buildTopicPartitionInfo(queueKey, partition);