diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c650d53b8f..0561c0c249 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1398,6 +1398,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index ba4eeb6ca4..3ccbc229a0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -24,11 +24,17 @@ import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ExecutorProvider; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; import java.util.stream.Collectors; @@ -38,6 +44,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { private final Map attributes; private final AmazonSQS sqsClient; private final Map queues; + @Getter + private final ExecutorService producerExecutor; public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map attributes) { this.attributes = attributes; @@ -49,6 +57,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } + producerExecutor = ThingsBoardExecutors.newWorkStealingPool(sqsSettings.getThreadPoolSize(), "aws-sqs-queue-executor"); sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(credentialsProvider) @@ -104,5 +113,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { if (sqsClient != null) { sqsClient.shutdown(); } + if (producerExecutor != null) { + producerExecutor.shutdownNow(); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index 863e96a714..f5bca724d0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -20,15 +20,11 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -41,19 +37,17 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; @Slf4j public class TbAwsSqsProducerTemplate implements TbQueueProducer { private final String defaultTopic; - private final AmazonSQS sqsClient; + private final AmazonSQSAsync sqsClient; private final Gson gson = new Gson(); private final Map queueUrlMap = new ConcurrentHashMap<>(); - private final TbQueueAdmin admin; - private ListeningExecutorService producerExecutor; + private final TbAwsSqsAdmin admin; public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) { - this.admin = admin; + this.admin = (TbAwsSqsAdmin) admin; this.defaultTopic = defaultTopic; AWSCredentialsProvider credentialsProvider; @@ -64,11 +58,11 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } - sqsClient = AmazonSQSClientBuilder.standard() + sqsClient = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion(sqsSettings.getRegion()) + .withExecutorFactory(this.admin::getProducerExecutor) .build(); - producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); } @Override @@ -91,30 +85,24 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sendMsgRequest.withMessageGroupId(sqsMsgId); sendMsgRequest.withMessageDeduplicationId(sqsMsgId); - ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(SendMessageResult result) { + sqsClient.sendMessageAsync(sendMsgRequest, new AsyncHandler() { + @Override public void onError(Exception e) { if (callback != null) { - callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata())); + callback.onFailure(e); } } - @Override - public void onFailure(Throwable t) { + @Override public void onSuccess(SendMessageRequest request, + SendMessageResult sendMessageResult) { if (callback != null) { - callback.onFailure(t); + callback.onSuccess(new AwsSqsTbQueueMsgMetadata(sendMessageResult.getSdkHttpMetadata())); } } - }, producerExecutor); + }); } @Override public void stop() { - if (producerExecutor != null) { - producerExecutor.shutdownNow(); - } if (sqsClient != null) { sqsClient.shutdown(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java index 89dc9d7e47..f2dc1efdfb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -42,4 +42,7 @@ public class TbAwsSqsSettings { @Value("${queue.aws_sqs.threads_per_topic}") private int threadsPerTopic; + @Value("${queue.aws_sqs.producer_thread_pool_size:50}") + private int threadPoolSize; + } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 962c5303c7..e4519529c5 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -155,6 +155,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index ab5deff3f7..f11b1b8ce2 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -281,6 +281,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 74be81e667..b2fa93c434 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -265,6 +265,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 10dbac101c..c2891994b1 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -360,6 +360,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 0eb8b4315b..519e7a8b32 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -297,6 +297,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index d139b641a6..ab9d6ad584 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -250,6 +250,8 @@ queue: region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" # Number of threads per each AWS SQS queue in consumer threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" + # Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE + producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}" queue-properties: # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"