TbAwsSqsProducerTemplate: native thread allocation error fix (provided one executor for message sending with limited thread pool size)
This commit is contained in:
parent
5cc0dc6d94
commit
3ac0cc241a
@ -20,15 +20,11 @@ import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
|
||||
import com.amazonaws.services.sqs.AmazonSQS;
|
||||
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
|
||||
import com.amazonaws.handlers.AsyncHandler;
|
||||
import com.amazonaws.services.sqs.AmazonSQSAsync;
|
||||
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
|
||||
import com.amazonaws.services.sqs.model.SendMessageRequest;
|
||||
import com.amazonaws.services.sqs.model.SendMessageResult;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.gson.Gson;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
@ -41,16 +37,14 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@Slf4j
|
||||
public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
|
||||
private final String defaultTopic;
|
||||
private final AmazonSQS sqsClient;
|
||||
private final AmazonSQSAsync sqsClient;
|
||||
private final Gson gson = new Gson();
|
||||
private final Map<String, String> queueUrlMap = new ConcurrentHashMap<>();
|
||||
private final TbQueueAdmin admin;
|
||||
private ListeningExecutorService producerExecutor;
|
||||
|
||||
public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) {
|
||||
this.admin = admin;
|
||||
@ -64,11 +58,11 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
|
||||
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
|
||||
}
|
||||
|
||||
sqsClient = AmazonSQSClientBuilder.standard()
|
||||
sqsClient = AmazonSQSAsyncClientBuilder.standard()
|
||||
.withCredentials(credentialsProvider)
|
||||
.withRegion(sqsSettings.getRegion())
|
||||
.withExecutorFactory(sqsSettings::getProducerExecutor)
|
||||
.build();
|
||||
producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -91,30 +85,24 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
|
||||
sendMsgRequest.withMessageGroupId(sqsMsgId);
|
||||
sendMsgRequest.withMessageDeduplicationId(sqsMsgId);
|
||||
|
||||
ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
|
||||
|
||||
Futures.addCallback(future, new FutureCallback<SendMessageResult>() {
|
||||
@Override
|
||||
public void onSuccess(SendMessageResult result) {
|
||||
sqsClient.sendMessageAsync(sendMsgRequest, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
|
||||
@Override public void onError(Exception e) {
|
||||
if (callback != null) {
|
||||
callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata()));
|
||||
callback.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
@Override public void onSuccess(SendMessageRequest request,
|
||||
SendMessageResult sendMessageResult) {
|
||||
if (callback != null) {
|
||||
callback.onFailure(t);
|
||||
callback.onSuccess(new AwsSqsTbQueueMsgMetadata(sendMessageResult.getSdkHttpMetadata()));
|
||||
}
|
||||
}
|
||||
}, producerExecutor);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (producerExecutor != null) {
|
||||
producerExecutor.shutdownNow();
|
||||
}
|
||||
if (sqsClient != null) {
|
||||
sqsClient.shutdown();
|
||||
}
|
||||
|
||||
@ -20,6 +20,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@Slf4j
|
||||
@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'")
|
||||
@ -42,4 +47,17 @@ public class TbAwsSqsSettings {
|
||||
@Value("${queue.aws_sqs.threads_per_topic}")
|
||||
private int threadsPerTopic;
|
||||
|
||||
@Value("${queue.aws_sqs.producer_thread_pool_size:0}")
|
||||
private int threadPoolSize;
|
||||
|
||||
private ExecutorService producerExecutor;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
if (threadPoolSize == 0) {
|
||||
threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50;
|
||||
}
|
||||
producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user