Kafka states restore improvements

This commit is contained in:
ViacheslavKlimov 2025-02-19 15:44:05 +02:00
parent 2a003709e0
commit ded6daf2b3
8 changed files with 247 additions and 44 deletions

View File

@ -33,14 +33,14 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueEvent;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeleteQueueTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.TbMsgPackCallback;
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
@ -78,13 +78,13 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
}
public void delete(boolean drainQueue) {
addTask(TbQueueConsumerManagerTask.delete(drainQueue));
addTask(new DeleteQueueTask(drainQueue));
}
@Override
protected void processTask(TbQueueConsumerManagerTask task) {
if (task.getEvent() == QueueEvent.DELETE) {
doDelete(task.isDrainQueue());
if (task instanceof DeleteQueueTask deleteQueueTask) {
doDelete(deleteQueueTask.drainQueue());
}
}

View File

@ -148,6 +148,9 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override
public void commit() {
if (consumerLock.isLocked()) {
if (stopped) {
return;
}
log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic " + topic, new RuntimeException("stacktrace"));
}
consumerLock.lock();

View File

@ -23,6 +23,8 @@ import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Collection;
@ -31,6 +33,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@ -87,20 +90,24 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
public void init(C config) {
this.config = config;
if (config.isConsumerPerPartition()) {
this.consumerWrapper = new ConsumerPerPartitionWrapper();
} else {
this.consumerWrapper = new SingleConsumerWrapper();
}
this.consumerWrapper = createConsumerWrapper(config);
log.debug("[{}] Initialized consumer for queue: {}", queueKey, config);
}
protected ConsumerWrapper<M> createConsumerWrapper(C config) {
if (config.isConsumerPerPartition()) {
return new ConsumerPerPartitionWrapper();
} else {
return new SingleConsumerWrapper();
}
}
public void update(C config) {
addTask(TbQueueConsumerManagerTask.configUpdate(config));
addTask(new UpdateConfigTask(config));
}
public void update(Set<TopicPartitionInfo> partitions) {
addTask(TbQueueConsumerManagerTask.partitionChange(partitions));
addTask(new UpdatePartitionsTask(partitions));
}
protected void addTask(TbQueueConsumerManagerTask todo) {
@ -125,10 +132,10 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
log.trace("[{}] Processing task: {}", queueKey, task);
if (task.getEvent() == QueueEvent.PARTITION_CHANGE) {
newPartitions = task.getPartitions();
} else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) {
newConfig = (C) task.getConfig();
if (task instanceof UpdatePartitionsTask updatePartitionsTask) {
newPartitions = updatePartitionsTask.partitions();
} else if (task instanceof UpdateConfigTask updateConfigTask) {
newConfig = (C) updateConfigTask.config();
} else {
processTask(task);
}
@ -184,7 +191,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
}
public void doUpdate(Set<TopicPartitionInfo> partitions) {
private void doUpdate(Set<TopicPartitionInfo> partitions) {
this.partitions = partitions;
consumerWrapper.updatePartitions(partitions);
}
@ -195,6 +202,15 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString());
consumerLoop(consumerTask.getConsumer());
log.info("[{}] Consumer stopped", consumerTask.getKey());
try {
Runnable callback = consumerTask.getCallback();
if (callback != null) {
callback.run();
}
} catch (Throwable t) {
log.error("Failed to execute finish callback", t);
}
});
consumerTask.setTask(consumerLoop);
}
@ -245,13 +261,13 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
awaitStop(30);
}
public void awaitStop(int timeoutSec) {
private void awaitStop(int timeoutSec) {
log.debug("[{}] Waiting for consumers to stop", queueKey);
consumerWrapper.getConsumers().forEach(consumerTask -> consumerTask.awaitCompletion(timeoutSec));
log.debug("[{}] Unsubscribed and stopped consumers", queueKey);
}
private static String partitionsToString(Collection<TopicPartitionInfo> partitions) {
static String partitionsToString(Collection<TopicPartitionInfo> partitions) {
return partitions.stream().map(tpi -> tpi.getFullTopicName() + (tpi.isUseInternalPartition() ?
"[" + tpi.getPartition().orElse(-1) + "]" : ""))
.collect(Collectors.joining(", ", "[", "]"));
@ -279,15 +295,24 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
removedPartitions.removeAll(partitions);
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions));
removePartitions(removedPartitions);
addPartitions(addedPartitions, null);
}
removedPartitions.forEach((tpi) -> consumers.get(tpi).initiateStop());
removedPartitions.forEach((tpi) -> consumers.remove(tpi).awaitCompletion());
protected void removePartitions(Set<TopicPartitionInfo> removedPartitions) {
removedPartitions.forEach((tpi) -> Optional.ofNullable(consumers.get(tpi)).ifPresent(TbQueueConsumerTask::initiateStop));
removedPartitions.forEach((tpi) -> Optional.ofNullable(consumers.remove(tpi)).ifPresent(TbQueueConsumerTask::awaitCompletion));
}
addedPartitions.forEach((tpi) -> {
protected void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) {
partitions.forEach(tpi -> {
Integer partitionId = tpi.getPartition().orElse(-1);
String key = queueKey + "-" + partitionId;
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId));
Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null;
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId), callback);
consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi));
launchConsumer(consumer);
@ -316,7 +341,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
}
if (consumer == null) {
consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null)); // no partitionId passed
consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null), null); // no partitionId passed
}
consumer.subscribe(partitions);
if (!consumer.isRunning()) {

View File

@ -0,0 +1,80 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common.consumer;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.AddPartitionsTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.RemovePartitionsTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@Slf4j
public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQueueConsumerManager<M, QueueConfig> {
private final ConsumerPerPartitionWrapper consumerWrapper;
@Getter
private final String topic;
@Builder(builderMethodName = "create") // not to conflict with super.builder()
public PartitionedQueueConsumerManager(QueueKey queueKey, String topic, long pollInterval, MsgPackProcessor<M, QueueConfig> msgPackProcessor,
BiFunction<QueueConfig, Integer, TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor, ScheduledExecutorService scheduler,
ExecutorService taskExecutor, Consumer<Throwable> uncaughtErrorHandler) {
super(queueKey, QueueConfig.of(true, pollInterval), msgPackProcessor, consumerCreator, consumerExecutor, scheduler, taskExecutor, uncaughtErrorHandler);
this.topic = topic;
this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper;
}
@Override
public void update(Set<TopicPartitionInfo> partitions) {
throw new UnsupportedOperationException("Use manual addPartitions and removePartitions");
}
@Override
protected void processTask(TbQueueConsumerManagerTask task) {
if (task instanceof AddPartitionsTask addPartitionsTask) {
log.info("[{}] Added partitions: {}", queueKey, partitionsToString(addPartitionsTask.partitions()));
consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop());
} else if (task instanceof RemovePartitionsTask removePartitionsTask) {
log.info("[{}] Removed partitions: {}", queueKey, partitionsToString(removePartitionsTask.partitions()));
consumerWrapper.removePartitions(removePartitionsTask.partitions());
}
}
public void addPartitions(Set<TopicPartitionInfo> partitions) {
addPartitions(partitions, null);
}
public void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) {
addTask(new AddPartitionsTask(partitions, onStop));
}
public void removePartitions(Set<TopicPartitionInfo> partitions) {
addTask(new RemovePartitionsTask(partitions));
}
}

View File

@ -0,0 +1,76 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common.consumer;
import lombok.Getter;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
private PartitionedQueueConsumerManager<S> stateConsumer;
private PartitionedQueueConsumerManager<E> eventConsumer;
@Getter
private Set<TopicPartitionInfo> partitions;
private final Lock lock = new ReentrantLock();
public void init(PartitionedQueueConsumerManager<S> stateConsumer, PartitionedQueueConsumerManager<E> eventConsumer) {
this.stateConsumer = stateConsumer;
this.eventConsumer = eventConsumer;
}
public void update(Set<TopicPartitionInfo> newPartitions) {
lock.lock();
Set<TopicPartitionInfo> oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet();
Set<TopicPartitionInfo> addedPartitions;
Set<TopicPartitionInfo> removedPartitions;
try {
addedPartitions = new HashSet<>(newPartitions);
addedPartitions.removeAll(oldPartitions);
removedPartitions = new HashSet<>(oldPartitions);
removedPartitions.removeAll(newPartitions);
this.partitions = newPartitions;
} finally {
lock.unlock();
}
if (!removedPartitions.isEmpty()) {
stateConsumer.removePartitions(removedPartitions);
eventConsumer.removePartitions(removedPartitions.stream().map(tpi -> tpi.withTopic(eventConsumer.getTopic())).collect(Collectors.toSet()));
}
if (!addedPartitions.isEmpty()) {
stateConsumer.addPartitions(addedPartitions, partition -> {
lock.lock();
try {
if (this.partitions.contains(partition)) {
eventConsumer.addPartitions(Set.of(partition.withTopic(eventConsumer.getTopic())));
}
} finally {
lock.unlock();
}
});
}
}
}

View File

@ -17,8 +17,9 @@ package org.thingsboard.server.queue.common.consumer;
import java.io.Serializable;
public enum QueueEvent implements Serializable {
public enum QueueTaskType implements Serializable {
PARTITION_CHANGE, CONFIG_UPDATE, DELETE
UPDATE_PARTITIONS, UPDATE_CONFIG, DELETE,
ADD_PARTITIONS, REMOVE_PARTITIONS
}

View File

@ -15,34 +15,49 @@
*/
package org.thingsboard.server.queue.common.consumer;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import java.util.Set;
import java.util.function.Consumer;
@Getter
@ToString
@AllArgsConstructor
public class TbQueueConsumerManagerTask {
public interface TbQueueConsumerManagerTask {
private final QueueEvent event;
private QueueConfig config;
private Set<TopicPartitionInfo> partitions;
private boolean drainQueue;
QueueTaskType getType();
public static TbQueueConsumerManagerTask delete(boolean drainQueue) {
return new TbQueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue);
record DeleteQueueTask(boolean drainQueue) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.DELETE;
}
}
public static TbQueueConsumerManagerTask configUpdate(QueueConfig config) {
return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, config, null, false);
record UpdateConfigTask(QueueConfig config) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.UPDATE_CONFIG;
}
}
public static TbQueueConsumerManagerTask partitionChange(Set<TopicPartitionInfo> partitions) {
return new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, null, partitions, false);
record UpdatePartitionsTask(Set<TopicPartitionInfo> partitions) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.UPDATE_PARTITIONS;
}
}
record AddPartitionsTask(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.ADD_PARTITIONS;
}
}
record RemovePartitionsTask(Set<TopicPartitionInfo> partitions) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.REMOVE_PARTITIONS;
}
}
}

View File

@ -35,14 +35,17 @@ public class TbQueueConsumerTask<M extends TbQueueMsg> {
private final Object key;
private volatile TbQueueConsumer<M> consumer;
private volatile Supplier<TbQueueConsumer<M>> consumerSupplier;
@Getter
private final Runnable callback;
@Setter
private Future<?> task;
public TbQueueConsumerTask(Object key, Supplier<TbQueueConsumer<M>> consumerSupplier) {
public TbQueueConsumerTask(Object key, Supplier<TbQueueConsumer<M>> consumerSupplier, Runnable callback) {
this.key = key;
this.consumer = null;
this.consumerSupplier = consumerSupplier;
this.callback = callback;
}
public TbQueueConsumer<M> getConsumer() {