From b95adb439a69ca231d57ebfc4fe94afe6aebb9fe Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 27 Dec 2023 17:06:26 +0200 Subject: [PATCH] added yml parameters for aws-sqs eproducer executor thread pool size --- application/src/main/resources/thingsboard.yml | 2 ++ .../org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java | 8 ++------ .../thingsboard/server/queue/sqs/TbAwsSqsSettings.java | 7 +------ msa/vc-executor/src/main/resources/tb-vc-executor.yml | 2 ++ transport/coap/src/main/resources/tb-coap-transport.yml | 2 ++ transport/http/src/main/resources/tb-http-transport.yml | 2 ++ transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml | 2 ++ transport/mqtt/src/main/resources/tb-mqtt-transport.yml | 2 ++ transport/snmp/src/main/resources/tb-snmp-transport.yml | 2 ++ 9 files changed, 17 insertions(+), 12 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c650d53b8f..0561c0c249 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1398,6 +1398,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index 7ef3b26b27..1c70302d00 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -43,7 +43,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { private final AmazonSQS sqsClient; private final Map queues; @Getter - private ExecutorService producerExecutor; + private final ExecutorService producerExecutor; public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map attributes) { this.attributes = attributes; @@ -55,11 +55,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } - int threadPoolSize = sqsSettings.getThreadPoolSize(); - if (threadPoolSize == 0) { - threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; - } - producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); + producerExecutor = Executors.newFixedThreadPool(sqsSettings.getThreadPoolSize(), ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(credentialsProvider) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java index 1f8e301c87..f2dc1efdfb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -20,11 +20,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.ThingsBoardThreadFactory; - -import javax.annotation.PostConstruct; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @Slf4j @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'") @@ -47,7 +42,7 @@ public class TbAwsSqsSettings { @Value("${queue.aws_sqs.threads_per_topic}") private int threadsPerTopic; - @Value("${queue.aws_sqs.producer_thread_pool_size:0}") + @Value("${queue.aws_sqs.producer_thread_pool_size:50}") private int threadPoolSize; } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 962c5303c7..e4519529c5 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -155,6 +155,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index ab5deff3f7..f11b1b8ce2 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -281,6 +281,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 74be81e667..b2fa93c434 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -265,6 +265,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 10dbac101c..c2891994b1 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -360,6 +360,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 0eb8b4315b..519e7a8b32 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -297,6 +297,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index d139b641a6..ab9d6ad584 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -250,6 +250,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"