Merge pull request #9915 from dashevchenko/aws_sqs_producer

TbAwsSqsProducerTemplate fix
This commit is contained in:
Andrew Shvayka 2024-01-02 18:17:44 +02:00 committed by GitHub
commit 426b0b83a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 44 additions and 27 deletions

View File

@ -1398,6 +1398,8 @@ queue:
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
# Number of threads per each AWS SQS queue in consumer # Number of threads per each AWS SQS queue in consumer
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
queue-properties: queue-properties:
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"

View File

@ -24,11 +24,17 @@ import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult; import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ExecutorProvider;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils; import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -38,6 +44,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
private final Map<String, String> attributes; private final Map<String, String> attributes;
private final AmazonSQS sqsClient; private final AmazonSQS sqsClient;
private final Map<String, String> queues; private final Map<String, String> queues;
@Getter
private final ExecutorService producerExecutor;
public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) { public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) {
this.attributes = attributes; this.attributes = attributes;
@ -49,6 +57,7 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
} }
producerExecutor = ThingsBoardExecutors.newWorkStealingPool(sqsSettings.getThreadPoolSize(), "aws-sqs-queue-executor");
sqsClient = AmazonSQSClientBuilder.standard() sqsClient = AmazonSQSClientBuilder.standard()
.withCredentials(credentialsProvider) .withCredentials(credentialsProvider)
@ -104,5 +113,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
if (sqsClient != null) { if (sqsClient != null) {
sqsClient.shutdown(); sqsClient.shutdown();
} }
if (producerExecutor != null) {
producerExecutor.shutdownNow();
}
} }
} }

View File

@ -20,15 +20,11 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult; import com.amazonaws.services.sqs.model.SendMessageResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson; import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -41,19 +37,17 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@Slf4j @Slf4j
public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
private final String defaultTopic; private final String defaultTopic;
private final AmazonSQS sqsClient; private final AmazonSQSAsync sqsClient;
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private final Map<String, String> queueUrlMap = new ConcurrentHashMap<>(); private final Map<String, String> queueUrlMap = new ConcurrentHashMap<>();
private final TbQueueAdmin admin; private final TbAwsSqsAdmin admin;
private ListeningExecutorService producerExecutor;
public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) { public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) {
this.admin = admin; this.admin = (TbAwsSqsAdmin) admin;
this.defaultTopic = defaultTopic; this.defaultTopic = defaultTopic;
AWSCredentialsProvider credentialsProvider; AWSCredentialsProvider credentialsProvider;
@ -64,11 +58,11 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
} }
sqsClient = AmazonSQSClientBuilder.standard() sqsClient = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider) .withCredentials(credentialsProvider)
.withRegion(sqsSettings.getRegion()) .withRegion(sqsSettings.getRegion())
.withExecutorFactory(this.admin::getProducerExecutor)
.build(); .build();
producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
} }
@Override @Override
@ -91,30 +85,24 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
sendMsgRequest.withMessageGroupId(sqsMsgId); sendMsgRequest.withMessageGroupId(sqsMsgId);
sendMsgRequest.withMessageDeduplicationId(sqsMsgId); sendMsgRequest.withMessageDeduplicationId(sqsMsgId);
ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); sqsClient.sendMessageAsync(sendMsgRequest, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
@Override public void onError(Exception e) {
Futures.addCallback(future, new FutureCallback<SendMessageResult>() {
@Override
public void onSuccess(SendMessageResult result) {
if (callback != null) { if (callback != null) {
callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata())); callback.onFailure(e);
} }
} }
@Override @Override public void onSuccess(SendMessageRequest request,
public void onFailure(Throwable t) { SendMessageResult sendMessageResult) {
if (callback != null) { if (callback != null) {
callback.onFailure(t); callback.onSuccess(new AwsSqsTbQueueMsgMetadata(sendMessageResult.getSdkHttpMetadata()));
} }
} }
}, producerExecutor); });
} }
@Override @Override
public void stop() { public void stop() {
if (producerExecutor != null) {
producerExecutor.shutdownNow();
}
if (sqsClient != null) { if (sqsClient != null) {
sqsClient.shutdown(); sqsClient.shutdown();
} }

View File

@ -42,4 +42,7 @@ public class TbAwsSqsSettings {
@Value("${queue.aws_sqs.threads_per_topic}") @Value("${queue.aws_sqs.threads_per_topic}")
private int threadsPerTopic; private int threadsPerTopic;
@Value("${queue.aws_sqs.producer_thread_pool_size:50}")
private int threadPoolSize;
} }

View File

@ -155,6 +155,8 @@ queue:
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
# Number of threads per each AWS SQS queue in consumer # Number of threads per each AWS SQS queue in consumer
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
queue-properties: queue-properties:
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"

View File

@ -281,6 +281,8 @@ queue:
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
# Number of threads per each AWS SQS queue in consumer # Number of threads per each AWS SQS queue in consumer
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
queue-properties: queue-properties:
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"

View File

@ -265,6 +265,8 @@ queue:
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
# Number of threads per each AWS SQS queue in consumer # Number of threads per each AWS SQS queue in consumer
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
queue-properties: queue-properties:
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"

View File

@ -360,6 +360,8 @@ queue:
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
# Number of threads per each AWS SQS queue in consumer # Number of threads per each AWS SQS queue in consumer
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
queue-properties: queue-properties:
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"

View File

@ -297,6 +297,8 @@ queue:
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
# Number of threads per each AWS SQS queue in consumer # Number of threads per each AWS SQS queue in consumer
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
queue-properties: queue-properties:
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"

View File

@ -250,6 +250,8 @@ queue:
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
# Number of threads per each AWS SQS queue in consumer # Number of threads per each AWS SQS queue in consumer
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
# Thread pool size for aws_sqs queue producer executor provider. Default value equals to AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE
producer_thread_pool_size: "${TB_QUEUE_AWS_SQS_EXECUTOR_THREAD_POOL_SIZE:50}"
queue-properties: queue-properties:
# AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds # AWS SQS queue properties. VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"