Configurable consumer per partition for tb-core queue

This commit is contained in:
ViacheslavKlimov 2024-03-19 13:49:46 +02:00
parent 8636ed3b0d
commit 134db42494
3 changed files with 7 additions and 3 deletions

View File

@ -126,6 +126,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private long pollInterval; private long pollInterval;
@Value("${queue.core.pack-processing-timeout}") @Value("${queue.core.pack-processing-timeout}")
private long packProcessingTimeout; private long packProcessingTimeout;
@Value("${queue.core.consumer-per-partition-enabled:true}")
private boolean consumerPerPartitionEnabled;
@Value("${queue.core.stats.enabled:false}") @Value("${queue.core.stats.enabled:false}")
private boolean statsEnabled; private boolean statsEnabled;
@ -199,7 +201,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.mainConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig>builder() this.mainConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig>builder()
.queueKey(new QueueKey(ServiceType.TB_CORE)) .queueKey(new QueueKey(ServiceType.TB_CORE))
.config(CoreQueueConfig.of(true, (int) pollInterval)) .config(CoreQueueConfig.of(consumerPerPartitionEnabled, (int) pollInterval))
.msgPackProcessor(this::processMsgs) .msgPackProcessor(this::processMsgs)
.consumerCreator(config -> queueFactory.createToCoreMsgConsumer()) .consumerCreator(config -> queueFactory.createToCoreMsgConsumer())
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
@ -764,6 +766,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
@Override @Override
protected void stopConsumers() { protected void stopConsumers() {
super.stopConsumers();
mainConsumer.stop(); mainConsumer.stop();
mainConsumer.awaitStop(); mainConsumer.awaitStop();
usageStatsConsumer.stop(); usageStatsConsumer.stop();

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.service.queue.processing; package org.thingsboard.server.service.queue.processing;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
@ -112,7 +111,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
} }
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void onApplicationEvent(ApplicationReadyEvent event) { public void afterStartUp() {
startConsumers(); startConsumers();
} }

View File

@ -1562,6 +1562,8 @@ queue:
partitions: "${TB_QUEUE_CORE_PARTITIONS:10}" partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
# Timeout for processing a message pack by Core microservices # Timeout for processing a message pack by Core microservices
pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}" pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}"
# Enable/disable a separate consumer per partition for Core queue
consumer-per-partition-enabled: "${TB_QUEUE_CORE_CONSUMER_PER_PARTITION_ENABLED:true}"
ota: ota:
# Default topic name for OTA updates # Default topic name for OTA updates
topic: "${TB_QUEUE_CORE_OTA_TOPIC:tb_ota_package}" topic: "${TB_QUEUE_CORE_OTA_TOPIC:tb_ota_package}"