From 3ac0cc241a23af1ecc885055f81f2d4bbcff0f2d Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 27 Dec 2023 13:47:52 +0200 Subject: [PATCH 1/5] TbAwsSqsProducerTemplate: native thread allocation error fix (provided one executor for message sending with limited thread pool size) --- .../queue/sqs/TbAwsSqsProducerTemplate.java | 38 +++++++------------ .../server/queue/sqs/TbAwsSqsSettings.java | 18 +++++++++ 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index 863e96a714..ab602e1960 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -20,15 +20,11 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -41,16 +37,14 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; @Slf4j public class TbAwsSqsProducerTemplate implements TbQueueProducer { private final String defaultTopic; - private final AmazonSQS sqsClient; + private final AmazonSQSAsync sqsClient; private final Gson gson = new Gson(); private final Map queueUrlMap = new ConcurrentHashMap<>(); private final TbQueueAdmin admin; - private ListeningExecutorService producerExecutor; public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) { this.admin = admin; @@ -64,11 +58,11 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } - sqsClient = AmazonSQSClientBuilder.standard() + sqsClient = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion(sqsSettings.getRegion()) + .withExecutorFactory(sqsSettings::getProducerExecutor) .build(); - producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); } @Override @@ -91,30 +85,24 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sendMsgRequest.withMessageGroupId(sqsMsgId); sendMsgRequest.withMessageDeduplicationId(sqsMsgId); - ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(SendMessageResult result) { + sqsClient.sendMessageAsync(sendMsgRequest, new AsyncHandler() { + @Override public void onError(Exception e) { if (callback != null) { - callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata())); + callback.onFailure(e); } } - @Override - public void onFailure(Throwable t) { + @Override public void onSuccess(SendMessageRequest request, + SendMessageResult sendMessageResult) { if (callback != null) { - callback.onFailure(t); + callback.onSuccess(new AwsSqsTbQueueMsgMetadata(sendMessageResult.getSdkHttpMetadata())); } } - }, producerExecutor); + }); } @Override public void stop() { - if (producerExecutor != null) { - producerExecutor.shutdownNow(); - } if (sqsClient != null) { sqsClient.shutdown(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java index 89dc9d7e47..0a3915496b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -20,6 +20,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardThreadFactory; + +import javax.annotation.PostConstruct; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Slf4j @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'") @@ -42,4 +47,17 @@ public class TbAwsSqsSettings { @Value("${queue.aws_sqs.threads_per_topic}") private int threadsPerTopic; + @Value("${queue.aws_sqs.producer_thread_pool_size:0}") + private int threadPoolSize; + + private ExecutorService producerExecutor; + + @PostConstruct + private void init() { + if (threadPoolSize == 0) { + threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; + } + producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); + } + } From 63a85161b2980a71221dae2fcd68986ef3767976 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 27 Dec 2023 14:44:49 +0200 Subject: [PATCH 2/5] moved producer executor to TbAwsSqsAdmin --- .../thingsboard/server/queue/sqs/TbAwsSqsAdmin.java | 11 +++++++++++ .../server/queue/sqs/TbAwsSqsProducerTemplate.java | 6 +++--- .../server/queue/sqs/TbAwsSqsSettings.java | 10 ---------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index ba4eeb6ca4..337bb271e5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -24,11 +24,15 @@ import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; import java.util.stream.Collectors; @@ -38,6 +42,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { private final Map attributes; private final AmazonSQS sqsClient; private final Map queues; + @Getter + private ExecutorService producerExecutor; public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map attributes) { this.attributes = attributes; @@ -49,6 +55,11 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } + int threadPoolSize = sqsSettings.getThreadPoolSize(); + if (threadPoolSize == 0) { + threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; + } + producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(credentialsProvider) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index ab602e1960..f5bca724d0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -44,10 +44,10 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr private final AmazonSQSAsync sqsClient; private final Gson gson = new Gson(); private final Map queueUrlMap = new ConcurrentHashMap<>(); - private final TbQueueAdmin admin; + private final TbAwsSqsAdmin admin; public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) { - this.admin = admin; + this.admin = (TbAwsSqsAdmin) admin; this.defaultTopic = defaultTopic; AWSCredentialsProvider credentialsProvider; @@ -61,7 +61,7 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sqsClient = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion(sqsSettings.getRegion()) - .withExecutorFactory(sqsSettings::getProducerExecutor) + .withExecutorFactory(this.admin::getProducerExecutor) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java index 0a3915496b..1f8e301c87 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -50,14 +50,4 @@ public class TbAwsSqsSettings { @Value("${queue.aws_sqs.producer_thread_pool_size:0}") private int threadPoolSize; - private ExecutorService producerExecutor; - - @PostConstruct - private void init() { - if (threadPoolSize == 0) { - threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; - } - producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); - } - } From f2ea1b87d76328b9bd2b46656f504fe8db75de27 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 27 Dec 2023 14:51:20 +0200 Subject: [PATCH 3/5] added shutdown for producerExecutor --- .../java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index 337bb271e5..7ef3b26b27 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -115,5 +115,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { if (sqsClient != null) { sqsClient.shutdown(); } + if (producerExecutor != null) { + producerExecutor.shutdownNow(); + } } } From b95adb439a69ca231d57ebfc4fe94afe6aebb9fe Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 27 Dec 2023 17:06:26 +0200 Subject: [PATCH 4/5] added yml parameters for aws-sqs eproducer executor thread pool size --- application/src/main/resources/thingsboard.yml | 2 ++ .../org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java | 8 ++------ .../thingsboard/server/queue/sqs/TbAwsSqsSettings.java | 7 +------ msa/vc-executor/src/main/resources/tb-vc-executor.yml | 2 ++ transport/coap/src/main/resources/tb-coap-transport.yml | 2 ++ transport/http/src/main/resources/tb-http-transport.yml | 2 ++ transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml | 2 ++ transport/mqtt/src/main/resources/tb-mqtt-transport.yml | 2 ++ transport/snmp/src/main/resources/tb-snmp-transport.yml | 2 ++ 9 files changed, 17 insertions(+), 12 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c650d53b8f..0561c0c249 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1398,6 +1398,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index 7ef3b26b27..1c70302d00 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -43,7 +43,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { private final AmazonSQS sqsClient; private final Map queues; @Getter - private ExecutorService producerExecutor; + private final ExecutorService producerExecutor; public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map attributes) { this.attributes = attributes; @@ -55,11 +55,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } - int threadPoolSize = sqsSettings.getThreadPoolSize(); - if (threadPoolSize == 0) { - threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; - } - producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); + producerExecutor = Executors.newFixedThreadPool(sqsSettings.getThreadPoolSize(), ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(credentialsProvider) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java index 1f8e301c87..f2dc1efdfb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -20,11 +20,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.ThingsBoardThreadFactory; - -import javax.annotation.PostConstruct; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @Slf4j @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'") @@ -47,7 +42,7 @@ public class TbAwsSqsSettings { @Value("${queue.aws_sqs.threads_per_topic}") private int threadsPerTopic; - @Value("${queue.aws_sqs.producer_thread_pool_size:0}") + @Value("${queue.aws_sqs.producer_thread_pool_size:50}") private int threadPoolSize; } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 962c5303c7..e4519529c5 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -155,6 +155,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index ab5deff3f7..f11b1b8ce2 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -281,6 +281,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 74be81e667..b2fa93c434 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -265,6 +265,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 10dbac101c..c2891994b1 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -360,6 +360,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 0eb8b4315b..519e7a8b32 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -297,6 +297,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index d139b641a6..ab9d6ad584 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -250,6 +250,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" From 5e471364de2ba0263d62a144376174bcd0176b26 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Tue, 2 Jan 2024 13:58:16 +0200 Subject: [PATCH 5/5] changed fixedPool to newWorkStealingPool --- .../java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index 1c70302d00..3ccbc229a0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -26,6 +26,8 @@ import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ExecutorProvider; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; @@ -55,7 +57,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } - producerExecutor = Executors.newFixedThreadPool(sqsSettings.getThreadPoolSize(), ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); + producerExecutor = ThingsBoardExecutors.newWorkStealingPool(sqsSettings.getThreadPoolSize(), "aws-sqs-queue-executor"); sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(credentialsProvider)