From eedb38384536215555c97cb4fcff53ae7ee1bf72 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 1 May 2020 14:15:31 +0300 Subject: [PATCH] AWS improvements --- .../server/queue/sqs/TbAwsSqsConsumerTemplate.java | 6 +++++- .../server/queue/sqs/TbAwsSqsProducerTemplate.java | 5 ++++- .../server/queue/sqs/TbAwsSqsQueueAttributes.java | 1 - msa/js-executor/package.json | 1 + msa/js-executor/queue/awsSqsTemplate.js | 5 +++-- msa/js-executor/queue/pubSubTemplate.js | 8 +++++--- 6 files changed, 18 insertions(+), 8 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java index 3e71388844..b66cad1504 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java @@ -127,6 +127,11 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo if (!subscribed) { List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); + + if (consumerExecutor != null) { + consumerExecutor.shutdown(); + } + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1)); subscribed = true; } @@ -172,7 +177,6 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo ReceiveMessageRequest request = new ReceiveMessageRequest(); request .withWaitTimeSeconds(waitTimeSeconds) - .withMessageAttributeNames("headers") .withQueueUrl(url) .withMaxNumberOfMessages(MAX_NUM_MSGS); return sqsClient.receiveMessage(request).getMessages(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index 2d85539184..6110d08c5e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -80,7 +81,9 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg))); - sendMsgRequest.withMessageGroupId(msg.getKey().toString()); + sendMsgRequest.withMessageGroupId(tpi.getTopic()); + sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString()); + ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); Futures.addCallback(future, new FutureCallback() { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java index 70a9587bef..c6cbbfd256 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java @@ -55,7 +55,6 @@ public class TbAwsSqsQueueAttributes { @PostConstruct private void init() { defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); - defaultAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true"); coreAttributes = getConfigs(coreProperties); ruleEngineAttributes = getConfigs(ruleEngineProperties); diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 3dadac4f84..60a107061a 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -22,6 +22,7 @@ "azure-sb": "^0.11.1", "long": "^4.0.0", "uuid-parse": "^1.0.0", + "uuid-random": "^1.3.0", "winston": "^3.0.0", "winston-daily-rotate-file": "^3.2.1" }, diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index e0ffb55c87..5f95de7d32 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -19,6 +19,7 @@ const config = require('config'), JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), logger = require('../config/logger')._logger('awsSqsTemplate'); +const uuid = require('uuid-random'); const requestTopic = config.get('request_topic'); @@ -29,7 +30,7 @@ const AWS = require('aws-sdk'); const queueProperties = config.get('aws_sqs.queue_properties'); const poolInterval = config.get('js.response_poll_interval'); -let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'}; +let queueAttributes = {FifoQueue: 'true'}; let sqsClient; let requestQueueURL; const queueUrls = new Map(); @@ -51,7 +52,7 @@ function AwsSqsProducer() { queueUrls.set(responseTopic, responseQueueUrl); } - let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId}; + let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: 'js_eval', MessageDeduplicationId: uuid()}; return new Promise((resolve, reject) => { sqsClient.sendMessage(params, function (err, data) { diff --git a/msa/js-executor/queue/pubSubTemplate.js b/msa/js-executor/queue/pubSubTemplate.js index 7d0b32ea34..cc5284022d 100644 --- a/msa/js-executor/queue/pubSubTemplate.js +++ b/msa/js-executor/queue/pubSubTemplate.js @@ -60,9 +60,11 @@ function PubSubProducer() { const topicList = await pubSubClient.getTopics(); if (topicList) { - topicList[0].forEach(topic => { - topics.push(getName(topic.name)); - }); + if (topicList) { + topicList[0].forEach(topic => { + topics.push(getName(topic.name)); + }); + } } const subscriptionList = await pubSubClient.getSubscriptions();