added env variable to regulate pubsud queue executor threads pool size

This commit is contained in:
dashevchenko 2023-11-13 13:14:37 +02:00
parent afa54ef277
commit 442fe9029e
9 changed files with 26 additions and 1 deletions

View File

@ -1411,6 +1411,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer # Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}"
queue-properties: queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.pubsub;
import com.google.api.core.ApiFuture; import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures; import com.google.api.core.ApiFutures;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Publisher;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -120,7 +121,14 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
try { try {
admin.createTopicIfNotExists(topic); admin.createTopicIfNotExists(topic);
ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), topic); ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), topic);
Publisher publisher = Publisher.newBuilder(topicName).setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); InstantiatingExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(pubSubSettings.getExecutorThreadPoolSize())
.build();
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(pubSubSettings.getCredentialsProvider())
.setExecutorProvider(executorProvider)
.build();
publisherMap.put(topic, publisher); publisherMap.put(topic, publisher);
return publisher; return publisher;
} catch (IOException e) { } catch (IOException e) {

View File

@ -46,6 +46,9 @@ public class TbPubSubSettings {
@Value("${queue.pubsub.max_messages}") @Value("${queue.pubsub.max_messages}")
private int maxMessages; private int maxMessages;
@Value("${queue.pubsub.executor_thread_pool_size}")
private int executorThreadPoolSize;
private CredentialsProvider credentialsProvider; private CredentialsProvider credentialsProvider;
@PostConstruct @PostConstruct

View File

@ -172,6 +172,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
# Number of messages per consumer # Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}"
queue-properties: queue-properties:
# Pub/Sub properties for Core subscribers, messages which will commit after ackDeadlineInSec period can be consumed again # Pub/Sub properties for Core subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -295,6 +295,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer # Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}"
queue-properties: queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -278,6 +278,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per a consumer # Number of messages per a consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}"
queue-properties: queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consume again # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consume again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -374,6 +374,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer # Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}"
queue-properties: queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -311,6 +311,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer # Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}"
queue-properties: queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"

View File

@ -264,6 +264,8 @@ queue:
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}"
# Number of messages per consumer # Number of messages per consumer
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
# Number of threads of pubsub executor provider
executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}"
queue-properties: queue-properties:
# Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"