From 91e13a69bc16faa8573b13b416b29fb4e06ffaa6 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 2 Nov 2020 10:21:44 +0200 Subject: [PATCH 1/4] removed TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES from queue-kafka.env --- docker/queue-kafka.env | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docker/queue-kafka.env b/docker/queue-kafka.env index 0207d64ef5..fec3cc50a6 100644 --- a/docker/queue-kafka.env +++ b/docker/queue-kafka.env @@ -1,3 +1,2 @@ TB_QUEUE_TYPE=kafka -TB_KAFKA_SERVERS=kafka:9092 -TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100 +TB_KAFKA_SERVERS=kafka:9092 \ No newline at end of file From 84ca08d36c64a3d27bd6a395ca01752a622fcf03 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 2 Nov 2020 10:27:24 +0200 Subject: [PATCH 2/4] added new line --- docker/queue-kafka.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/queue-kafka.env b/docker/queue-kafka.env index fec3cc50a6..63107942fb 100644 --- a/docker/queue-kafka.env +++ b/docker/queue-kafka.env @@ -1,2 +1,2 @@ TB_QUEUE_TYPE=kafka -TB_KAFKA_SERVERS=kafka:9092 \ No newline at end of file +TB_KAFKA_SERVERS=kafka:9092 From 938588c5bc0ad5491256d7c63947ed6f826e2c12 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 30 Oct 2020 12:23:08 +0200 Subject: [PATCH 3/4] sqs producer improvements --- .../server/queue/sqs/TbAwsSqsProducerTemplate.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index 3e508b09e6..a4e977b6f0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -87,8 +87,9 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg))); - sendMsgRequest.withMessageGroupId(tpi.getTopic()); - sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString()); + String sqsMsgId = UUID.randomUUID().toString(); + sendMsgRequest.withMessageGroupId(sqsMsgId); + sendMsgRequest.withMessageDeduplicationId(sqsMsgId); ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); From eeec6bac9fe510e0db95d597bf14bc8d836a55eb Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 30 Oct 2020 13:29:48 +0200 Subject: [PATCH 4/4] sqs producer improvements --- msa/js-executor/queue/awsSqsTemplate.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index 52d4997f33..24315e5cad 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -52,11 +52,13 @@ function AwsSqsProducer() { queueUrls.set(responseTopic, responseQueueUrl); } + let msgId = uuid(); + let params = { MessageBody: msgBody, QueueUrl: responseQueueUrl, - MessageGroupId: 'js_eval', - MessageDeduplicationId: uuid() + MessageGroupId: msgId, + MessageDeduplicationId: msgId }; return new Promise((resolve, reject) => {