Fix NONE partitioning strategy

This commit is contained in:
ViacheslavKlimov 2025-03-20 17:17:08 +02:00
parent 0a14ce3f12
commit b7604a8d0a
6 changed files with 37 additions and 11 deletions

View File

@ -76,8 +76,9 @@ public class DefaultEdqsApiService implements EdqsApiService {
requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits());
}
Integer partition = edqsPartitionService.resolvePartition(tenantId);
ListenableFuture<TbProtoQueueMsg<FromEdqsMsg>> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition);
UUID key = UUID.randomUUID();
Integer partition = edqsPartitionService.resolvePartition(tenantId, key);
ListenableFuture<TbProtoQueueMsg<FromEdqsMsg>> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(key, requestMsg.build()), partition);
return Futures.transform(resultFuture, msg -> {
TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg();
return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class);

View File

@ -173,13 +173,18 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
if (CollectionsUtil.isNotEmpty(oldPartitions)) {
Set<Integer> removedPartitions = Sets.difference(oldPartitions, newPartitions).stream()
.map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet());
if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) {
if (removedPartitions.isEmpty()) {
return;
}
if (config.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) {
repository.clearIf(tenantId -> {
Integer partition = partitionService.resolvePartition(tenantId, null);
return removedPartitions.contains(partition);
});
} else {
log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions);
}
repository.clearIf(tenantId -> {
Integer partition = partitionService.resolvePartition(tenantId);
return partition != null && removedPartitions.contains(partition);
});
}
} catch (Throwable t) {
log.error("Failed to handle partition change event {}", event, t);

View File

@ -72,7 +72,7 @@ public class EdqsProducer {
};
TopicPartitionInfo tpi = TopicPartitionInfo.builder()
.topic(topic)
.partition(partitionService.resolvePartition(tenantId))
.partition(partitionService.resolvePartition(tenantId, key))
.build();
if (producer instanceof TbKafkaProducerTemplate<TbProtoQueueMsg<ToEdqsMsg>> kafkaProducer) {
kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction

View File

@ -29,11 +29,14 @@ public class EdqsPartitionService {
private final HashPartitionService hashPartitionService;
private final EdqsConfig edqsConfig;
public Integer resolvePartition(TenantId tenantId) {
public Integer resolvePartition(TenantId tenantId, Object key) {
if (edqsConfig.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) {
return hashPartitionService.resolvePartitionIndex(tenantId.getId(), edqsConfig.getPartitions());
} else {
return null;
if (key == null) {
throw new IllegalArgumentException("Partitioning key is missing but partitioning strategy is not TENANT");
}
return hashPartitionService.resolvePartitionIndex(key.toString(), edqsConfig.getPartitions());
}
}

View File

@ -39,6 +39,7 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent;
import org.thingsboard.server.queue.util.AfterStartUp;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -559,7 +560,15 @@ public class HashPartitionService implements PartitionService {
@Override
public int resolvePartitionIndex(UUID entityId, int partitions) {
int hash = hash(entityId);
return resolvePartitionIndex(hash(entityId), partitions);
}
@Override
public int resolvePartitionIndex(String key, int partitions) {
return resolvePartitionIndex(hash(key), partitions);
}
private int resolvePartitionIndex(int hash, int partitions) {
return Math.abs(hash % partitions);
}
@ -725,6 +734,12 @@ public class HashPartitionService implements PartitionService {
.hash().asInt();
}
private int hash(String key) {
return hashFunction.newHasher()
.putString(key, StandardCharsets.UTF_8)
.hash().asInt();
}
public static HashFunction forName(String name) {
return switch (name) {
case "murmur3_32" -> Hashing.murmur3_32();

View File

@ -79,4 +79,6 @@ public interface PartitionService {
int resolvePartitionIndex(UUID entityId, int partitions);
int resolvePartitionIndex(String key, int partitions);
}