Minor renaming due to code review

This commit is contained in:
Andrii Shvaika 2025-02-19 18:23:30 +02:00
parent a3bd78c37f
commit 535730b61a

View File

@ -22,8 +22,8 @@ import org.thingsboard.server.queue.TbQueueMsg;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> { public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
@ -33,7 +33,7 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
@Getter @Getter
private Set<TopicPartitionInfo> partitions; private Set<TopicPartitionInfo> partitions;
private final Lock lock = new ReentrantLock(); private final ReadWriteLock partitionsLock = new ReentrantReadWriteLock();
public void init(PartitionedQueueConsumerManager<S> stateConsumer, PartitionedQueueConsumerManager<E> eventConsumer) { public void init(PartitionedQueueConsumerManager<S> stateConsumer, PartitionedQueueConsumerManager<E> eventConsumer) {
this.stateConsumer = stateConsumer; this.stateConsumer = stateConsumer;
@ -42,7 +42,7 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
public void update(Set<TopicPartitionInfo> newPartitions) { public void update(Set<TopicPartitionInfo> newPartitions) {
newPartitions = withTopic(newPartitions, stateConsumer.getTopic()); newPartitions = withTopic(newPartitions, stateConsumer.getTopic());
lock.lock(); var writeLock = partitionsLock.writeLock();
Set<TopicPartitionInfo> oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet(); Set<TopicPartitionInfo> oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet();
Set<TopicPartitionInfo> addedPartitions; Set<TopicPartitionInfo> addedPartitions;
Set<TopicPartitionInfo> removedPartitions; Set<TopicPartitionInfo> removedPartitions;
@ -53,7 +53,7 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
removedPartitions.removeAll(newPartitions); removedPartitions.removeAll(newPartitions);
this.partitions = newPartitions; this.partitions = newPartitions;
} finally { } finally {
lock.unlock(); writeLock.unlock();
} }
if (!removedPartitions.isEmpty()) { if (!removedPartitions.isEmpty()) {
@ -63,13 +63,13 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
if (!addedPartitions.isEmpty()) { if (!addedPartitions.isEmpty()) {
stateConsumer.addPartitions(addedPartitions, partition -> { stateConsumer.addPartitions(addedPartitions, partition -> {
lock.lock(); var readLock = partitionsLock.readLock();
try { try {
if (this.partitions.contains(partition)) { if (this.partitions.contains(partition)) {
eventConsumer.addPartitions(Set.of(partition.newByTopic(eventConsumer.getTopic()))); eventConsumer.addPartitions(Set.of(partition.newByTopic(eventConsumer.getTopic())));
} }
} finally { } finally {
lock.unlock(); readLock.unlock();
} }
}); });
} }