CF states restore improvements
This commit is contained in:
parent
5dc6991907
commit
2b3af16530
@ -21,6 +21,9 @@ import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestore
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.exception.CalculatedFieldStateException;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
|
||||
|
||||
@ -32,6 +35,13 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
|
||||
@Autowired
|
||||
private ActorSystemContext actorSystemContext;
|
||||
|
||||
protected PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer;
|
||||
|
||||
@Override
|
||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
|
||||
this.eventConsumer = eventConsumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
|
||||
if (state.isStateTooLarge()) {
|
||||
|
||||
@ -18,6 +18,9 @@ package org.thingsboard.server.service.cf;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.exception.CalculatedFieldStateException;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
|
||||
|
||||
@ -25,10 +28,14 @@ import java.util.Set;
|
||||
|
||||
public interface CalculatedFieldStateService {
|
||||
|
||||
void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer);
|
||||
|
||||
void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) throws CalculatedFieldStateException;
|
||||
|
||||
void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback);
|
||||
|
||||
void restore(Set<TopicPartitionInfo> partitions);
|
||||
|
||||
void stop();
|
||||
|
||||
}
|
||||
|
||||
@ -15,15 +15,11 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.cf.ctx.state;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
@ -31,25 +27,22 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgHeaders;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
||||
import org.thingsboard.server.queue.common.consumer.QueueStateService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
|
||||
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
|
||||
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
|
||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
||||
import org.thingsboard.server.service.queue.DefaultTbCalculatedFieldConsumerService.CalculatedFieldQueueConfig;
|
||||
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString;
|
||||
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToUuid;
|
||||
@ -67,27 +60,20 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
|
||||
@Value("${queue.calculated_fields.poll_interval:25}")
|
||||
private long pollInterval;
|
||||
@Value("${queue.calculated_fields.consumer_per_partition:true}")
|
||||
private boolean consumerPerPartition;
|
||||
|
||||
private MainQueueConsumerManager<TbProtoQueueMsg<CalculatedFieldStateProto>, CalculatedFieldQueueConfig> stateConsumer;
|
||||
private PartitionedQueueConsumerManager<TbProtoQueueMsg<CalculatedFieldStateProto>> stateConsumer;
|
||||
private TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>> stateProducer;
|
||||
|
||||
protected ExecutorService consumersExecutor;
|
||||
protected ExecutorService mgmtExecutor;
|
||||
protected ScheduledExecutorService scheduler;
|
||||
private QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> queueStateService;
|
||||
|
||||
private final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cf-state-consumer"));
|
||||
this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(Math.max(Runtime.getRuntime().availableProcessors(), 4), "cf-state-mgmt");
|
||||
this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("cf-state-consumer-scheduler");
|
||||
|
||||
this.stateConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>, CalculatedFieldQueueConfig>builder()
|
||||
@Override
|
||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
|
||||
super.init(eventConsumer);
|
||||
this.stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
|
||||
.queueKey(QueueKey.CF_STATES)
|
||||
.config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval))
|
||||
.topic(partitionService.getTopic(QueueKey.CF_STATES))
|
||||
.pollInterval(pollInterval)
|
||||
.msgPackProcessor((msgs, consumer, config) -> {
|
||||
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) {
|
||||
try {
|
||||
@ -107,11 +93,13 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
}
|
||||
})
|
||||
.consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer())
|
||||
.consumerExecutor(consumersExecutor)
|
||||
.scheduler(scheduler)
|
||||
.taskExecutor(mgmtExecutor)
|
||||
.consumerExecutor(eventConsumer.getConsumerExecutor())
|
||||
.scheduler(eventConsumer.getScheduler())
|
||||
.taskExecutor(eventConsumer.getTaskExecutor())
|
||||
.build();
|
||||
this.stateProducer = (TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>>) queueFactory.createCalculatedFieldStateProducer();
|
||||
this.queueStateService = new QueueStateService<>();
|
||||
this.queueStateService.init(stateConsumer, super.eventConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -145,15 +133,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
|
||||
@Override
|
||||
public void restore(Set<TopicPartitionInfo> partitions) {
|
||||
partitions = partitions.stream().map(tpi -> tpi.newByTopic(partitionService.getTopic(QueueKey.CF_STATES))).collect(Collectors.toSet());
|
||||
log.info("Restoring calculated field states for partitions: {}", partitions.stream().map(TopicPartitionInfo::getFullTopicName).toList());
|
||||
long startTs = System.currentTimeMillis();
|
||||
counter.set(0);
|
||||
|
||||
stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update
|
||||
stateConsumer.awaitStop(0);// consumers should stop on their own because stopWhenRead is true, we just need to wait
|
||||
|
||||
log.info("Restored {} calculated field states in {} ms", counter.get(), System.currentTimeMillis() - startTs);
|
||||
queueStateService.update(partitions);
|
||||
}
|
||||
|
||||
private void putStateId(TbQueueMsgHeaders headers, CalculatedFieldEntityCtxId stateId) {
|
||||
@ -170,15 +150,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
|
||||
return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void preDestroy() {
|
||||
@Override
|
||||
public void stop() {
|
||||
stateConsumer.stop();
|
||||
stateConsumer.awaitStop();
|
||||
stateProducer.stop();
|
||||
|
||||
consumersExecutor.shutdownNow();
|
||||
mgmtExecutor.shutdownNow();
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -54,18 +54,21 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
|
||||
@Override
|
||||
public void restore(Set<TopicPartitionInfo> partitions) {
|
||||
if (this.partitions == null) {
|
||||
this.partitions = partitions;
|
||||
} else {
|
||||
return;
|
||||
cfRocksDb.forEach((key, value) -> {
|
||||
try {
|
||||
processRestoredState(CalculatedFieldStateProto.parseFrom(value));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
log.error("[{}] Failed to process restored state", key, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
cfRocksDb.forEach((key, value) -> {
|
||||
try {
|
||||
processRestoredState(CalculatedFieldStateProto.parseFrom(value));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
log.error("[{}] Failed to process restored state", key, e);
|
||||
}
|
||||
});
|
||||
eventConsumer.update(partitions);
|
||||
this.partitions = partitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,13 +19,11 @@ import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
|
||||
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
|
||||
@ -45,6 +43,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
@ -55,7 +54,6 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
|
||||
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
|
||||
import org.thingsboard.server.service.queue.processing.IdMsgPair;
|
||||
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
|
||||
@ -66,8 +64,6 @@ import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
@ -83,18 +79,15 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
private long pollInterval;
|
||||
@Value("${queue.calculated_fields.pack_processing_timeout:60000}")
|
||||
private long packProcessingTimeout;
|
||||
@Value("${queue.calculated_fields.consumer_per_partition:true}")
|
||||
private boolean consumerPerPartition;
|
||||
@Value("${queue.calculated_fields.pool_size:8}")
|
||||
private int poolSize;
|
||||
|
||||
private final TbRuleEngineQueueFactory queueFactory;
|
||||
private final CalculatedFieldStateService stateService;
|
||||
|
||||
private MainQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>, CalculatedFieldQueueConfig> mainConsumer;
|
||||
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer;
|
||||
|
||||
private ListeningExecutorService calculatedFieldsExecutor;
|
||||
private ExecutorService repartitionExecutor;
|
||||
|
||||
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
|
||||
ActorSystemContext actorContext,
|
||||
@ -117,17 +110,18 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
public void init() {
|
||||
super.init("tb-cf");
|
||||
this.calculatedFieldsExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(poolSize, "tb-cf-executor")); // TODO: multiple threads.
|
||||
this.repartitionExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-cf-repartition"));
|
||||
|
||||
this.mainConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>, CalculatedFieldQueueConfig>builder()
|
||||
this.eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
|
||||
.queueKey(QueueKey.CF)
|
||||
.config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval))
|
||||
.topic(partitionService.getTopic(QueueKey.CF))
|
||||
.pollInterval(pollInterval)
|
||||
.msgPackProcessor(this::processMsgs)
|
||||
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
|
||||
.consumerExecutor(consumersExecutor)
|
||||
.scheduler(scheduler)
|
||||
.taskExecutor(mgmtExecutor)
|
||||
.build();
|
||||
stateService.init(eventConsumer);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@ -146,17 +140,16 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
@Override
|
||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||
var partitions = event.getCfPartitions();
|
||||
repartitionExecutor.submit(() -> {
|
||||
try {
|
||||
stateService.restore(partitions);
|
||||
mainConsumer.update(partitions);
|
||||
// Cleanup old entities after corresponding consumers are stopped.
|
||||
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
|
||||
actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions)));
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to process partition change event: {}", event, t);
|
||||
}
|
||||
});
|
||||
try {
|
||||
stateService.restore(partitions);
|
||||
// eventConsumer's partitions will be updated by stateService
|
||||
|
||||
// Cleanup old entities after corresponding consumers are stopped.
|
||||
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
|
||||
actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions)));
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to process partition change event: {}", event, t);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean[] partitionsToBooleanIndexArray(Set<TopicPartitionInfo> partitions) {
|
||||
@ -167,7 +160,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
return myPartitions;
|
||||
}
|
||||
|
||||
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, CalculatedFieldQueueConfig config) throws Exception {
|
||||
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception {
|
||||
List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
|
||||
ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect(
|
||||
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
|
||||
@ -275,14 +268,9 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
@Override
|
||||
protected void stopConsumers() {
|
||||
super.stopConsumers();
|
||||
mainConsumer.stop();
|
||||
mainConsumer.awaitStop();
|
||||
}
|
||||
|
||||
@Data(staticConstructor = "of")
|
||||
public static class CalculatedFieldQueueConfig implements QueueConfig {
|
||||
private final boolean consumerPerPartition;
|
||||
private final int pollInterval;
|
||||
eventConsumer.stop();
|
||||
eventConsumer.awaitStop();
|
||||
stateService.stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1757,8 +1757,6 @@ queue:
|
||||
partitions: "${TB_QUEUE_CF_PARTITIONS:10}"
|
||||
# Timeout for processing a message pack by CF microservices
|
||||
pack_processing_timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:60000}"
|
||||
# Enable/disable a separate consumer per partition for CF queue
|
||||
consumer_per_partition: "${TB_QUEUE_CF_CONSUMER_PER_PARTITION:true}"
|
||||
# Thread pool size for processing of the incoming messages
|
||||
pool_size: "${TB_QUEUE_CF_POOL_SIZE:8}"
|
||||
# RocksDB path for storing CF states
|
||||
|
||||
@ -53,8 +53,11 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
|
||||
protected C config;
|
||||
protected final MsgPackProcessor<M, C> msgPackProcessor;
|
||||
protected final BiFunction<C, Integer, TbQueueConsumer<M>> consumerCreator;
|
||||
@Getter
|
||||
protected final ExecutorService consumerExecutor;
|
||||
@Getter
|
||||
protected final ScheduledExecutorService scheduler;
|
||||
@Getter
|
||||
protected final ExecutorService taskExecutor;
|
||||
|
||||
private final java.util.Queue<TbQueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@ -49,11 +49,6 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
|
||||
this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(Set<TopicPartitionInfo> partitions) {
|
||||
throw new UnsupportedOperationException("Use manual addPartitions and removePartitions");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processTask(TbQueueConsumerManagerTask task) {
|
||||
if (task instanceof AddPartitionsTask addPartitionsTask) {
|
||||
|
||||
@ -41,6 +41,7 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
|
||||
}
|
||||
|
||||
public void update(Set<TopicPartitionInfo> newPartitions) {
|
||||
newPartitions = withTopic(newPartitions, stateConsumer.getTopic());
|
||||
lock.lock();
|
||||
Set<TopicPartitionInfo> oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet();
|
||||
Set<TopicPartitionInfo> addedPartitions;
|
||||
@ -54,9 +55,10 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
if (!removedPartitions.isEmpty()) {
|
||||
stateConsumer.removePartitions(removedPartitions);
|
||||
eventConsumer.removePartitions(removedPartitions.stream().map(tpi -> tpi.newByTopic(eventConsumer.getTopic())).collect(Collectors.toSet()));
|
||||
eventConsumer.removePartitions(withTopic(removedPartitions, eventConsumer.getTopic()));
|
||||
}
|
||||
|
||||
if (!addedPartitions.isEmpty()) {
|
||||
@ -73,4 +75,8 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
|
||||
}
|
||||
}
|
||||
|
||||
private Set<TopicPartitionInfo> withTopic(Set<TopicPartitionInfo> partitions, String topic) {
|
||||
return partitions.stream().map(tpi -> tpi.newByTopic(topic)).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user