Refactoring for EDQS repartitioning
This commit is contained in:
		
							parent
							
								
									6bf56c8dc2
								
							
						
					
					
						commit
						7519889c1a
					
				@ -33,8 +33,8 @@ import static org.awaitility.Awaitility.await;
 | 
			
		||||
 | 
			
		||||
@DaoSqlTest
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
//        "queue.type=kafka", // uncomment to use Kafka
 | 
			
		||||
//        "queue.kafka.bootstrap.servers=10.7.1.254:9092",
 | 
			
		||||
        "queue.type=kafka", // uncomment to use Kafka
 | 
			
		||||
        "queue.kafka.bootstrap.servers=192.168.0.105:9092",
 | 
			
		||||
        "queue.edqs.sync.enabled=true",
 | 
			
		||||
        "queue.edqs.api_enabled=true",
 | 
			
		||||
        "queue.edqs.mode=local"
 | 
			
		||||
 | 
			
		||||
@ -92,12 +92,11 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
    @Autowired @Lazy
 | 
			
		||||
    private EdqsStateService stateService;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventsConsumer;
 | 
			
		||||
    private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
 | 
			
		||||
    private TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
 | 
			
		||||
 | 
			
		||||
    private ExecutorService consumersExecutor;
 | 
			
		||||
    private ExecutorService mgmtExecutor;
 | 
			
		||||
    private ExecutorService taskExecutor;
 | 
			
		||||
    private ScheduledExecutorService scheduler;
 | 
			
		||||
    private ListeningExecutorService requestExecutor;
 | 
			
		||||
    private ExecutorService repartitionExecutor;
 | 
			
		||||
@ -112,7 +111,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer"));
 | 
			
		||||
        mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-mgmt");
 | 
			
		||||
        taskExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-task-executor");
 | 
			
		||||
        scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-scheduler");
 | 
			
		||||
        requestExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(12, "edqs-requests"));
 | 
			
		||||
        repartitionExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edqs-repartition"));
 | 
			
		||||
@ -125,7 +124,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        eventsConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
 | 
			
		||||
        eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
 | 
			
		||||
                .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic()))
 | 
			
		||||
                .topic(EdqsQueue.EVENTS.getTopic())
 | 
			
		||||
                .pollInterval(config.getPollInterval())
 | 
			
		||||
@ -146,10 +145,12 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
                })
 | 
			
		||||
                .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS))
 | 
			
		||||
                .consumerExecutor(consumersExecutor)
 | 
			
		||||
                .taskExecutor(mgmtExecutor)
 | 
			
		||||
                .taskExecutor(taskExecutor)
 | 
			
		||||
                .scheduler(scheduler)
 | 
			
		||||
                .uncaughtErrorHandler(errorHandler)
 | 
			
		||||
                .build();
 | 
			
		||||
        stateService.init(eventConsumer);
 | 
			
		||||
 | 
			
		||||
        responseTemplate = queueFactory.createEdqsResponseTemplate();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -171,7 +172,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
 | 
			
		||||
            stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic()));
 | 
			
		||||
            // eventsConsumer's partitions are updated by stateService
 | 
			
		||||
            responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic()));
 | 
			
		||||
            responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); // FIXME: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
 | 
			
		||||
 | 
			
		||||
            Set<TopicPartitionInfo> oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS));
 | 
			
		||||
            if (CollectionsUtil.isNotEmpty(oldPartitions)) {
 | 
			
		||||
@ -280,12 +281,13 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    public void destroy() throws InterruptedException {
 | 
			
		||||
        eventsConsumer.stop();
 | 
			
		||||
        eventsConsumer.awaitStop();
 | 
			
		||||
        eventConsumer.stop();
 | 
			
		||||
        eventConsumer.awaitStop();
 | 
			
		||||
        responseTemplate.stop();
 | 
			
		||||
        stateService.stop();
 | 
			
		||||
 | 
			
		||||
        consumersExecutor.shutdownNow();
 | 
			
		||||
        mgmtExecutor.shutdownNow();
 | 
			
		||||
        taskExecutor.shutdownNow();
 | 
			
		||||
        scheduler.shutdownNow();
 | 
			
		||||
        requestExecutor.shutdownNow();
 | 
			
		||||
        repartitionExecutor.shutdownNow();
 | 
			
		||||
 | 
			
		||||
@ -1,42 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.edqs.state;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.http.ResponseEntity;
 | 
			
		||||
import org.springframework.web.bind.annotation.GetMapping;
 | 
			
		||||
import org.springframework.web.bind.annotation.RequestMapping;
 | 
			
		||||
import org.springframework.web.bind.annotation.RestController;
 | 
			
		||||
 | 
			
		||||
@RestController
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@ConditionalOnExpression("'${service.type:null}'=='edqs'")
 | 
			
		||||
@RequestMapping("/api/edqs")
 | 
			
		||||
public class EdqsController {
 | 
			
		||||
 | 
			
		||||
    private final EdqsStateService edqsStateService;
 | 
			
		||||
 | 
			
		||||
    @GetMapping("/ready")
 | 
			
		||||
    public ResponseEntity<Void> isReady() {
 | 
			
		||||
        if (edqsStateService.isReady()) {
 | 
			
		||||
            return ResponseEntity.ok().build();
 | 
			
		||||
        } else {
 | 
			
		||||
            return ResponseEntity.badRequest().build();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -20,15 +20,19 @@ import org.thingsboard.server.common.data.edqs.EdqsEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
 | 
			
		||||
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
 | 
			
		||||
public interface EdqsStateService {
 | 
			
		||||
 | 
			
		||||
    void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer);
 | 
			
		||||
 | 
			
		||||
    void process(Set<TopicPartitionInfo> partitions);
 | 
			
		||||
 | 
			
		||||
    void save(TenantId tenantId, ObjectType type, String key, EdqsEventType eventType, ToEdqsMsg msg);
 | 
			
		||||
 | 
			
		||||
    boolean isReady();
 | 
			
		||||
    void stop();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,13 +15,9 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.edqs.state;
 | 
			
		||||
 | 
			
		||||
import jakarta.annotation.PostConstruct;
 | 
			
		||||
import jakarta.annotation.PreDestroy;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.ObjectType;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.EdqsEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
@ -44,16 +40,13 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
 | 
			
		||||
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.ScheduledExecutorService;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@KafkaEdqsComponent
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class KafkaEdqsStateService extends QueueStateService<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>> implements EdqsStateService {
 | 
			
		||||
public class KafkaEdqsStateService implements EdqsStateService {
 | 
			
		||||
 | 
			
		||||
    private final EdqsConfig config;
 | 
			
		||||
    private final EdqsPartitionService partitionService;
 | 
			
		||||
@ -61,25 +54,19 @@ public class KafkaEdqsStateService extends QueueStateService<TbProtoQueueMsg<ToE
 | 
			
		||||
    private final EdqsProcessor edqsProcessor;
 | 
			
		||||
 | 
			
		||||
    private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> stateConsumer;
 | 
			
		||||
    private QueueStateService<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>> queueStateService;
 | 
			
		||||
    private QueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventsToBackupConsumer;
 | 
			
		||||
    private EdqsProducer stateProducer;
 | 
			
		||||
 | 
			
		||||
    private ExecutorService consumersExecutor;
 | 
			
		||||
    private ExecutorService mgmtExecutor;
 | 
			
		||||
    private ScheduledExecutorService scheduler;
 | 
			
		||||
 | 
			
		||||
    private final VersionsStore versionsStore = new VersionsStore();
 | 
			
		||||
    private final AtomicInteger stateReadCount = new AtomicInteger();
 | 
			
		||||
    private final AtomicInteger eventsReadCount = new AtomicInteger();
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer"));
 | 
			
		||||
        mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-backup-consumer-mgmt");
 | 
			
		||||
        scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-backup-scheduler");
 | 
			
		||||
 | 
			
		||||
        stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create() // FIXME Slavik: if topic is empty
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer) {
 | 
			
		||||
        stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
 | 
			
		||||
                .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic()))
 | 
			
		||||
                .topic(EdqsQueue.STATE.getTopic())
 | 
			
		||||
                .pollInterval(config.getPollInterval())
 | 
			
		||||
                .msgPackProcessor((msgs, consumer, config) -> {
 | 
			
		||||
                    for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
 | 
			
		||||
@ -100,12 +87,13 @@ public class KafkaEdqsStateService extends QueueStateService<TbProtoQueueMsg<ToE
 | 
			
		||||
                    consumer.commit();
 | 
			
		||||
                })
 | 
			
		||||
                .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE))
 | 
			
		||||
                .consumerExecutor(consumersExecutor)
 | 
			
		||||
                .taskExecutor(mgmtExecutor)
 | 
			
		||||
                .scheduler(scheduler)
 | 
			
		||||
                .consumerExecutor(eventConsumer.getConsumerExecutor())
 | 
			
		||||
                .taskExecutor(eventConsumer.getTaskExecutor())
 | 
			
		||||
                .scheduler(eventConsumer.getScheduler())
 | 
			
		||||
                .uncaughtErrorHandler(edqsProcessor.getErrorHandler())
 | 
			
		||||
                .build();
 | 
			
		||||
        super.init(stateConsumer, edqsProcessor.getEventsConsumer());
 | 
			
		||||
        queueStateService = new QueueStateService<>();
 | 
			
		||||
        queueStateService.init(stateConsumer, eventConsumer);
 | 
			
		||||
 | 
			
		||||
        eventsToBackupConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>builder()
 | 
			
		||||
                .name("edqs-events-to-backup-consumer")
 | 
			
		||||
@ -145,7 +133,7 @@ public class KafkaEdqsStateService extends QueueStateService<TbProtoQueueMsg<ToE
 | 
			
		||||
                    consumer.commit();
 | 
			
		||||
                })
 | 
			
		||||
                .consumerCreator(() -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group
 | 
			
		||||
                .consumerExecutor(consumersExecutor)
 | 
			
		||||
                .consumerExecutor(eventConsumer.getConsumerExecutor())
 | 
			
		||||
                .threadPrefix("edqs-events-to-backup")
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
@ -158,11 +146,11 @@ public class KafkaEdqsStateService extends QueueStateService<TbProtoQueueMsg<ToE
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        if (getPartitions() == null) {
 | 
			
		||||
        if (queueStateService.getPartitions() == null) {
 | 
			
		||||
            eventsToBackupConsumer.subscribe();
 | 
			
		||||
            eventsToBackupConsumer.launch();
 | 
			
		||||
        }
 | 
			
		||||
        super.update(partitions);
 | 
			
		||||
        queueStateService.update(partitions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -170,25 +158,16 @@ public class KafkaEdqsStateService extends QueueStateService<TbProtoQueueMsg<ToE
 | 
			
		||||
        // do nothing here, backup is done by events consumer
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean isReady() {
 | 
			
		||||
        return initialRestoreDone;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TenantId getTenantId(ToEdqsMsg edqsMsg) {
 | 
			
		||||
        return TenantId.fromUUID(new UUID(edqsMsg.getTenantIdMSB(), edqsMsg.getTenantIdLSB()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    private void preDestroy() {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void stop() {
 | 
			
		||||
        stateConsumer.stop();
 | 
			
		||||
        stateConsumer.awaitStop();
 | 
			
		||||
        eventsToBackupConsumer.stop();
 | 
			
		||||
        stateProducer.stop();
 | 
			
		||||
 | 
			
		||||
        consumersExecutor.shutdownNow();
 | 
			
		||||
        mgmtExecutor.shutdownNow();
 | 
			
		||||
        scheduler.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -25,6 +25,8 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.edqs.processor.EdqsProcessor;
 | 
			
		||||
import org.thingsboard.server.edqs.util.EdqsRocksDb;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsQueue;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
 | 
			
		||||
 | 
			
		||||
@ -39,11 +41,17 @@ public class LocalEdqsStateService implements EdqsStateService {
 | 
			
		||||
    private final EdqsProcessor processor;
 | 
			
		||||
    private final EdqsRocksDb db;
 | 
			
		||||
 | 
			
		||||
    private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
 | 
			
		||||
    private Set<TopicPartitionInfo> partitions;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer) {
 | 
			
		||||
        this.eventConsumer = eventConsumer;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        if (this.partitions != null) {
 | 
			
		||||
        if (this.partitions == null) {
 | 
			
		||||
            db.forEach((key, value) -> {
 | 
			
		||||
                try {
 | 
			
		||||
                    ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value);
 | 
			
		||||
@ -54,7 +62,7 @@ public class LocalEdqsStateService implements EdqsStateService {
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
        processor.getEventsConsumer().update(partitions);
 | 
			
		||||
        eventConsumer.update(partitions);
 | 
			
		||||
        this.partitions = partitions;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -73,8 +81,7 @@ public class LocalEdqsStateService implements EdqsStateService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean isReady() {
 | 
			
		||||
        return partitions != null;
 | 
			
		||||
    public void stop() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -58,7 +58,7 @@ public class TopicPartitionInfo {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TopicPartitionInfo newByTopic(String topic) {
 | 
			
		||||
        return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.myPartition);
 | 
			
		||||
        return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.useInternalPartition, this.myPartition);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String getTopic() {
 | 
			
		||||
 | 
			
		||||
@ -54,8 +54,11 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
 | 
			
		||||
    protected C config;
 | 
			
		||||
    protected final MsgPackProcessor<M, C> msgPackProcessor;
 | 
			
		||||
    protected final BiFunction<C, Integer, TbQueueConsumer<M>> consumerCreator;
 | 
			
		||||
    @Getter
 | 
			
		||||
    protected final ExecutorService consumerExecutor;
 | 
			
		||||
    @Getter
 | 
			
		||||
    protected final ScheduledExecutorService scheduler;
 | 
			
		||||
    @Getter
 | 
			
		||||
    protected final ExecutorService taskExecutor;
 | 
			
		||||
    protected final Consumer<Throwable> uncaughtErrorHandler;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -49,11 +49,6 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
 | 
			
		||||
        this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void update(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        throw new UnsupportedOperationException("Use manual addPartitions and removePartitions");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void processTask(TbQueueConsumerManagerTask task) {
 | 
			
		||||
        if (task instanceof AddPartitionsTask addPartitionsTask) {
 | 
			
		||||
 | 
			
		||||
@ -41,6 +41,7 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void update(Set<TopicPartitionInfo> newPartitions) {
 | 
			
		||||
        newPartitions = withTopic(newPartitions, stateConsumer.getTopic());
 | 
			
		||||
        lock.lock();
 | 
			
		||||
        Set<TopicPartitionInfo> oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet();
 | 
			
		||||
        Set<TopicPartitionInfo> addedPartitions;
 | 
			
		||||
@ -54,9 +55,10 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
 | 
			
		||||
        } finally {
 | 
			
		||||
            lock.unlock();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (!removedPartitions.isEmpty()) {
 | 
			
		||||
            stateConsumer.removePartitions(removedPartitions);
 | 
			
		||||
            eventConsumer.removePartitions(removedPartitions.stream().map(tpi -> tpi.withTopic(eventConsumer.getTopic())).collect(Collectors.toSet()));
 | 
			
		||||
            eventConsumer.removePartitions(withTopic(removedPartitions, eventConsumer.getTopic()));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (!addedPartitions.isEmpty()) {
 | 
			
		||||
@ -73,4 +75,8 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Set<TopicPartitionInfo> withTopic(Set<TopicPartitionInfo> partitions, String topic) {
 | 
			
		||||
        return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user