From 0f324d113297ea7ffb56df3ab59172af423868ed Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 24 Feb 2023 11:02:29 +0100 Subject: [PATCH] HashPartitionService: set myPartitions atomically --- .../server/queue/discovery/HashPartitionService.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 31f81d5510..3fc1626f52 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -68,7 +68,7 @@ public class HashPartitionService implements PartitionService { private final TenantRoutingInfoService tenantRoutingInfoService; private final QueueRoutingInfoService queueRoutingInfoService; - private ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); + private volatile ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); private final ConcurrentMap partitionTopicsMap = new ConcurrentHashMap<>(); private final ConcurrentMap partitionSizesMap = new ConcurrentHashMap<>(); @@ -217,17 +217,19 @@ public class HashPartitionService implements PartitionService { } queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId))); - ConcurrentMap> oldPartitions = myPartitions; - myPartitions = new ConcurrentHashMap<>(); + final ConcurrentMap> newPartitions = new ConcurrentHashMap<>(); partitionSizesMap.forEach((queueKey, size) -> { for (int i = 0; i < size; i++) { ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); if (currentService.equals(serviceInfo)) { - myPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); + newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); } } }); + final ConcurrentMap> oldPartitions = myPartitions; + myPartitions = newPartitions; + oldPartitions.forEach((queueKey, partitions) -> { if (!myPartitions.containsKey(queueKey)) { log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);