From 702a8b6139e2a2ea16c42f4301b9936fabc82e5b Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 30 Apr 2020 18:10:05 +0300 Subject: [PATCH] Force unsubscribe from Kafka topics --- .../thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java | 1 + docker/tb-node.env | 2 ++ 2 files changed, 3 insertions(+) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 49d1d74102..541337579d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -106,6 +106,7 @@ public class TbKafkaConsumerTemplate implements TbQueueCon if (!subscribed) { List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); topicNames.forEach(admin::createTopicIfNotExists); + consumer.unsubscribe(); consumer.subscribe(topicNames); subscribed = true; } diff --git a/docker/tb-node.env b/docker/tb-node.env index 542296babe..12cdc7d035 100644 --- a/docker/tb-node.env +++ b/docker/tb-node.env @@ -10,3 +10,5 @@ CACHE_TYPE=redis REDIS_HOST=redis HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false + +TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE=64