diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java index 51c963ed2f..c7e17b62ae 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java @@ -76,8 +76,9 @@ public class DefaultEdqsApiService implements EdqsApiService { requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits()); } - Integer partition = edqsPartitionService.resolvePartition(tenantId); - ListenableFuture> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition); + UUID key = UUID.randomUUID(); + Integer partition = edqsPartitionService.resolvePartition(tenantId, key); + ListenableFuture> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(key, requestMsg.build()), partition); return Futures.transform(resultFuture, msg -> { TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg(); return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index f2360617ee..fb15c3fc73 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -173,13 +173,18 @@ public class EdqsProcessor implements TbQueueHandler, if (CollectionsUtil.isNotEmpty(oldPartitions)) { Set removedPartitions = Sets.difference(oldPartitions, newPartitions).stream() .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet()); - if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) { + if (removedPartitions.isEmpty()) { + return; + } + + if (config.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) { + repository.clearIf(tenantId -> { + Integer partition = partitionService.resolvePartition(tenantId, null); + return removedPartitions.contains(partition); + }); + } else { log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions); } - repository.clearIf(tenantId -> { - Integer partition = partitionService.resolvePartition(tenantId); - return partition != null && removedPartitions.contains(partition); - }); } } catch (Throwable t) { log.error("Failed to handle partition change event {}", event, t); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index 970ee76381..a06836339f 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -72,7 +72,7 @@ public class EdqsProducer { }; TopicPartitionInfo tpi = TopicPartitionInfo.builder() .topic(topic) - .partition(partitionService.resolvePartition(tenantId)) + .partition(partitionService.resolvePartition(tenantId, key)) .build(); if (producer instanceof TbKafkaProducerTemplate> kafkaProducer) { kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java index 94e9437650..e2fbf9a981 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java @@ -29,11 +29,14 @@ public class EdqsPartitionService { private final HashPartitionService hashPartitionService; private final EdqsConfig edqsConfig; - public Integer resolvePartition(TenantId tenantId) { + public Integer resolvePartition(TenantId tenantId, Object key) { if (edqsConfig.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) { return hashPartitionService.resolvePartitionIndex(tenantId.getId(), edqsConfig.getPartitions()); } else { - return null; + if (key == null) { + throw new IllegalArgumentException("Partitioning key is missing but partitioning strategy is not TENANT"); + } + return hashPartitionService.resolvePartitionIndex(key.toString(), edqsConfig.getPartitions()); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 3a76d50825..7186bf7055 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent; import org.thingsboard.server.queue.util.AfterStartUp; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -559,7 +560,15 @@ public class HashPartitionService implements PartitionService { @Override public int resolvePartitionIndex(UUID entityId, int partitions) { - int hash = hash(entityId); + return resolvePartitionIndex(hash(entityId), partitions); + } + + @Override + public int resolvePartitionIndex(String key, int partitions) { + return resolvePartitionIndex(hash(key), partitions); + } + + private int resolvePartitionIndex(int hash, int partitions) { return Math.abs(hash % partitions); } @@ -725,6 +734,12 @@ public class HashPartitionService implements PartitionService { .hash().asInt(); } + private int hash(String key) { + return hashFunction.newHasher() + .putString(key, StandardCharsets.UTF_8) + .hash().asInt(); + } + public static HashFunction forName(String name) { return switch (name) { case "murmur3_32" -> Hashing.murmur3_32(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 7abd68e25f..5dda413f17 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -79,4 +79,6 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); + int resolvePartitionIndex(String key, int partitions); + }