From 535730b61aad9b9c20d9bd1f80eb5f0f2b5d33a7 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 19 Feb 2025 18:23:30 +0200 Subject: [PATCH] Minor renaming due to code review --- .../queue/common/consumer/QueueStateService.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java index 0931b9ed5b..55f248f0d9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java @@ -22,8 +22,8 @@ import org.thingsboard.server.queue.TbQueueMsg; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; public class QueueStateService { @@ -33,7 +33,7 @@ public class QueueStateService { @Getter private Set partitions; - private final Lock lock = new ReentrantLock(); + private final ReadWriteLock partitionsLock = new ReentrantReadWriteLock(); public void init(PartitionedQueueConsumerManager stateConsumer, PartitionedQueueConsumerManager eventConsumer) { this.stateConsumer = stateConsumer; @@ -42,7 +42,7 @@ public class QueueStateService { public void update(Set newPartitions) { newPartitions = withTopic(newPartitions, stateConsumer.getTopic()); - lock.lock(); + var writeLock = partitionsLock.writeLock(); Set oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet(); Set addedPartitions; Set removedPartitions; @@ -53,7 +53,7 @@ public class QueueStateService { removedPartitions.removeAll(newPartitions); this.partitions = newPartitions; } finally { - lock.unlock(); + writeLock.unlock(); } if (!removedPartitions.isEmpty()) { @@ -63,13 +63,13 @@ public class QueueStateService { if (!addedPartitions.isEmpty()) { stateConsumer.addPartitions(addedPartitions, partition -> { - lock.lock(); + var readLock = partitionsLock.readLock(); try { if (this.partitions.contains(partition)) { eventConsumer.addPartitions(Set.of(partition.newByTopic(eventConsumer.getTopic()))); } } finally { - lock.unlock(); + readLock.unlock(); } }); }