diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 1f421974be..241aec25bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -33,14 +33,14 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.common.consumer.QueueEvent; +import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeleteQueueTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.queue.TbMsgPackCallback; import org.thingsboard.server.service.queue.TbMsgPackProcessingContext; import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; -import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy; @@ -78,13 +78,13 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager i @Override public void commit() { if (consumerLock.isLocked()) { + if (stopped) { + return; + } log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic " + topic, new RuntimeException("stacktrace")); } consumerLock.lock(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index 9d45b168ee..7bd8076ce9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -23,6 +23,8 @@ import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask; import org.thingsboard.server.queue.discovery.QueueKey; import java.util.Collection; @@ -31,6 +33,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -87,20 +90,24 @@ public class MainQueueConsumerManager createConsumerWrapper(C config) { + if (config.isConsumerPerPartition()) { + return new ConsumerPerPartitionWrapper(); + } else { + return new SingleConsumerWrapper(); + } + } + public void update(C config) { - addTask(TbQueueConsumerManagerTask.configUpdate(config)); + addTask(new UpdateConfigTask(config)); } public void update(Set partitions) { - addTask(TbQueueConsumerManagerTask.partitionChange(partitions)); + addTask(new UpdatePartitionsTask(partitions)); } protected void addTask(TbQueueConsumerManagerTask todo) { @@ -125,10 +132,10 @@ public class MainQueueConsumerManager partitions) { + private void doUpdate(Set partitions) { this.partitions = partitions; consumerWrapper.updatePartitions(partitions); } @@ -195,6 +202,15 @@ public class MainQueueConsumerManager consumerTask.awaitCompletion(timeoutSec)); log.debug("[{}] Unsubscribed and stopped consumers", queueKey); } - private static String partitionsToString(Collection partitions) { + static String partitionsToString(Collection partitions) { return partitions.stream().map(tpi -> tpi.getFullTopicName() + (tpi.isUseInternalPartition() ? "[" + tpi.getPartition().orElse(-1) + "]" : "")) .collect(Collectors.joining(", ", "[", "]")); @@ -279,15 +295,24 @@ public class MainQueueConsumerManager removedPartitions = new HashSet<>(consumers.keySet()); removedPartitions.removeAll(partitions); + log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions)); + removePartitions(removedPartitions); + addPartitions(addedPartitions, null); + } - removedPartitions.forEach((tpi) -> consumers.get(tpi).initiateStop()); - removedPartitions.forEach((tpi) -> consumers.remove(tpi).awaitCompletion()); + protected void removePartitions(Set removedPartitions) { + removedPartitions.forEach((tpi) -> Optional.ofNullable(consumers.get(tpi)).ifPresent(TbQueueConsumerTask::initiateStop)); + removedPartitions.forEach((tpi) -> Optional.ofNullable(consumers.remove(tpi)).ifPresent(TbQueueConsumerTask::awaitCompletion)); + } - addedPartitions.forEach((tpi) -> { + protected void addPartitions(Set partitions, Consumer onStop) { + partitions.forEach(tpi -> { Integer partitionId = tpi.getPartition().orElse(-1); String key = queueKey + "-" + partitionId; - TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId)); + Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null; + + TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId), callback); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); @@ -316,7 +341,7 @@ public class MainQueueConsumerManager(queueKey, () -> consumerCreator.apply(config, null)); // no partitionId passed + consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null), null); // no partitionId passed } consumer.subscribe(partitions); if (!consumer.isRunning()) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java new file mode 100644 index 0000000000..57f0950cdb --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -0,0 +1,80 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common.consumer; + +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.queue.QueueConfig; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.AddPartitionsTask; +import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.RemovePartitionsTask; +import org.thingsboard.server.queue.discovery.QueueKey; + +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +@Slf4j +public class PartitionedQueueConsumerManager extends MainQueueConsumerManager { + + private final ConsumerPerPartitionWrapper consumerWrapper; + @Getter + private final String topic; + + @Builder(builderMethodName = "create") // not to conflict with super.builder() + public PartitionedQueueConsumerManager(QueueKey queueKey, String topic, long pollInterval, MsgPackProcessor msgPackProcessor, + BiFunction> consumerCreator, + ExecutorService consumerExecutor, ScheduledExecutorService scheduler, + ExecutorService taskExecutor, Consumer uncaughtErrorHandler) { + super(queueKey, QueueConfig.of(true, pollInterval), msgPackProcessor, consumerCreator, consumerExecutor, scheduler, taskExecutor, uncaughtErrorHandler); + this.topic = topic; + this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper; + } + + @Override + public void update(Set partitions) { + throw new UnsupportedOperationException("Use manual addPartitions and removePartitions"); + } + + @Override + protected void processTask(TbQueueConsumerManagerTask task) { + if (task instanceof AddPartitionsTask addPartitionsTask) { + log.info("[{}] Added partitions: {}", queueKey, partitionsToString(addPartitionsTask.partitions())); + consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop()); + } else if (task instanceof RemovePartitionsTask removePartitionsTask) { + log.info("[{}] Removed partitions: {}", queueKey, partitionsToString(removePartitionsTask.partitions())); + consumerWrapper.removePartitions(removePartitionsTask.partitions()); + } + } + + public void addPartitions(Set partitions) { + addPartitions(partitions, null); + } + + public void addPartitions(Set partitions, Consumer onStop) { + addTask(new AddPartitionsTask(partitions, onStop)); + } + + public void removePartitions(Set partitions) { + addTask(new RemovePartitionsTask(partitions)); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java new file mode 100644 index 0000000000..bffe441f7c --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java @@ -0,0 +1,76 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common.consumer; + +import lombok.Getter; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +public class QueueStateService { + + private PartitionedQueueConsumerManager stateConsumer; + private PartitionedQueueConsumerManager eventConsumer; + + @Getter + private Set partitions; + private final Lock lock = new ReentrantLock(); + + public void init(PartitionedQueueConsumerManager stateConsumer, PartitionedQueueConsumerManager eventConsumer) { + this.stateConsumer = stateConsumer; + this.eventConsumer = eventConsumer; + } + + public void update(Set newPartitions) { + lock.lock(); + Set oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet(); + Set addedPartitions; + Set removedPartitions; + try { + addedPartitions = new HashSet<>(newPartitions); + addedPartitions.removeAll(oldPartitions); + removedPartitions = new HashSet<>(oldPartitions); + removedPartitions.removeAll(newPartitions); + this.partitions = newPartitions; + } finally { + lock.unlock(); + } + if (!removedPartitions.isEmpty()) { + stateConsumer.removePartitions(removedPartitions); + eventConsumer.removePartitions(removedPartitions.stream().map(tpi -> tpi.withTopic(eventConsumer.getTopic())).collect(Collectors.toSet())); + } + + if (!addedPartitions.isEmpty()) { + stateConsumer.addPartitions(addedPartitions, partition -> { + lock.lock(); + try { + if (this.partitions.contains(partition)) { + eventConsumer.addPartitions(Set.of(partition.withTopic(eventConsumer.getTopic()))); + } + } finally { + lock.unlock(); + } + }); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueTaskType.java similarity index 83% rename from common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueEvent.java rename to common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueTaskType.java index 1a78cfc2ba..4e218cd268 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueTaskType.java @@ -17,8 +17,9 @@ package org.thingsboard.server.queue.common.consumer; import java.io.Serializable; -public enum QueueEvent implements Serializable { +public enum QueueTaskType implements Serializable { - PARTITION_CHANGE, CONFIG_UPDATE, DELETE + UPDATE_PARTITIONS, UPDATE_CONFIG, DELETE, + ADD_PARTITIONS, REMOVE_PARTITIONS } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java index 67bf370db0..7e2c848a66 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java @@ -15,34 +15,49 @@ */ package org.thingsboard.server.queue.common.consumer; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.ToString; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import java.util.Set; +import java.util.function.Consumer; -@Getter -@ToString -@AllArgsConstructor -public class TbQueueConsumerManagerTask { +public interface TbQueueConsumerManagerTask { - private final QueueEvent event; - private QueueConfig config; - private Set partitions; - private boolean drainQueue; + QueueTaskType getType(); - public static TbQueueConsumerManagerTask delete(boolean drainQueue) { - return new TbQueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue); + record DeleteQueueTask(boolean drainQueue) implements TbQueueConsumerManagerTask { + @Override + public QueueTaskType getType() { + return QueueTaskType.DELETE; + } } - public static TbQueueConsumerManagerTask configUpdate(QueueConfig config) { - return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, config, null, false); + record UpdateConfigTask(QueueConfig config) implements TbQueueConsumerManagerTask { + @Override + public QueueTaskType getType() { + return QueueTaskType.UPDATE_CONFIG; + } } - public static TbQueueConsumerManagerTask partitionChange(Set partitions) { - return new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, null, partitions, false); + record UpdatePartitionsTask(Set partitions) implements TbQueueConsumerManagerTask { + @Override + public QueueTaskType getType() { + return QueueTaskType.UPDATE_PARTITIONS; + } + } + + record AddPartitionsTask(Set partitions, Consumer onStop) implements TbQueueConsumerManagerTask { + @Override + public QueueTaskType getType() { + return QueueTaskType.ADD_PARTITIONS; + } + } + + record RemovePartitionsTask(Set partitions) implements TbQueueConsumerManagerTask { + @Override + public QueueTaskType getType() { + return QueueTaskType.REMOVE_PARTITIONS; + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java index 4ed0ffa497..96085bf49a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerTask.java @@ -35,14 +35,17 @@ public class TbQueueConsumerTask { private final Object key; private volatile TbQueueConsumer consumer; private volatile Supplier> consumerSupplier; + @Getter + private final Runnable callback; @Setter private Future task; - public TbQueueConsumerTask(Object key, Supplier> consumerSupplier) { + public TbQueueConsumerTask(Object key, Supplier> consumerSupplier, Runnable callback) { this.key = key; this.consumer = null; this.consumerSupplier = consumerSupplier; + this.callback = callback; } public TbQueueConsumer getConsumer() {