diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index bdf0ad8252..e292237da5 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1411,6 +1411,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Number of threads of pubsub executor provider + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java index 0d84a8f839..56e21da3db 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.pubsub; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.gson.Gson; import com.google.protobuf.ByteString; @@ -120,7 +121,14 @@ public class TbPubSubProducerTemplate implements TbQueuePr try { admin.createTopicIfNotExists(topic); ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), topic); - Publisher publisher = Publisher.newBuilder(topicName).setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); + InstantiatingExecutorProvider executorProvider = + InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(pubSubSettings.getExecutorThreadPoolSize()) + .build(); + Publisher publisher = Publisher.newBuilder(topicName) + .setCredentialsProvider(pubSubSettings.getCredentialsProvider()) + .setExecutorProvider(executorProvider) + .build(); publisherMap.put(topic, publisher); return publisher; } catch (IOException e) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java index fe9d85c306..b1b095f23c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java @@ -46,6 +46,9 @@ public class TbPubSubSettings { @Value("${queue.pubsub.max_messages}") private int maxMessages; + @Value("${queue.pubsub.executor_thread_pool_size}") + private int executorThreadPoolSize; + private CredentialsProvider credentialsProvider; @PostConstruct diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 4ffa1ab702..cfbd61eda5 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -172,6 +172,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Number of threads of pubsub executor provider + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}" queue-properties: # Pub/Sub properties for Core subscribers, messages which will commit after ackDeadlineInSec period can be consumed again core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 437f551690..be54ed34cf 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -295,6 +295,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Number of threads of pubsub executor provider + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 8d4d6fadf0..cf2c49b846 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -278,6 +278,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per a consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Number of threads of pubsub executor provider + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consume again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 8fd1c11e6e..9884b33390 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -374,6 +374,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Number of threads of pubsub executor provider + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 6c51d18ab7..0c26caa97e 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -311,6 +311,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Number of threads of pubsub executor provider + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index eeb6807714..5e4dc06275 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -264,6 +264,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Number of threads of pubsub executor provider + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:50}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"