Merge pull request #8131 from smatvienko-tb/feature/hash-partition-service-improvement

[3.5] Hash partition service improvement
This commit is contained in:
Andrew Shvayka 2023-02-27 12:44:24 +02:00 committed by GitHub
commit 9944bcc92e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -68,7 +68,7 @@ public class HashPartitionService implements PartitionService {
private final TenantRoutingInfoService tenantRoutingInfoService;
private final QueueRoutingInfoService queueRoutingInfoService;
private ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
private volatile ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, String> partitionTopicsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, Integer> partitionSizesMap = new ConcurrentHashMap<>();
@ -217,17 +217,19 @@ public class HashPartitionService implements PartitionService {
}
queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId)));
ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
myPartitions = new ConcurrentHashMap<>();
final ConcurrentMap<QueueKey, List<Integer>> newPartitions = new ConcurrentHashMap<>();
partitionSizesMap.forEach((queueKey, size) -> {
for (int i = 0; i < size; i++) {
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i);
if (currentService.equals(serviceInfo)) {
myPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
}
}
});
final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
myPartitions = newPartitions;
oldPartitions.forEach((queueKey, partitions) -> {
if (!myPartitions.containsKey(queueKey)) {
log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);