diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 38e54e8873..39d91341f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -437,10 +437,12 @@ public class TbRuleEngineQueueConsumerManager { consumers.remove(tpi).awaitCompletion(); }); + +// ctx.getQueueAdmin().createTopicIfNotExists(); addedPartitions.forEach((tpi) -> { int partitionId = tpi.getPartition().orElse(-999999); String key = queueKey + "-" + partitionId; - TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, partitionId)); + TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null)); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index bf5555817d..65cc28405a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -18,7 +18,10 @@ package org.thingsboard.server.queue.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; @@ -28,6 +31,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Created by ashvayka on 24.09.18. @@ -109,6 +114,32 @@ public class TbKafkaAdmin implements TbQueueAdmin { } public CreateTopicsResult createTopic(NewTopic topic) { + client.listConsumerGroupOffsets("id1"); return client.createTopics(Collections.singletonList(topic)); } + + public void syncOffsets(String oldGroupId, String newGroupId) throws ExecutionException, InterruptedException, TimeoutException { + ListConsumerGroupOffsetsResult fatOffsets = client.listConsumerGroupOffsets("id1"); + Map oldOffsets = new ConcurrentHashMap<>(); + client.listConsumerGroupOffsets(oldGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { + if (err != null) { + log.warn("Failed to list consumer group offsets [{}]", oldGroupId, err); + } else { + oldOffsets.putAll(res); + } + }).get(10, TimeUnit.SECONDS); + + Map newOffsets = new ConcurrentHashMap<>(); + client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { + if (err != null) { + log.warn("Failed to list consumer group offsets [{}]", newGroupId, err); + } else { + newOffsets.putAll(res); + } + }).get(10, TimeUnit.SECONDS); + + + + } + } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java new file mode 100644 index 0000000000..429419363d --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java @@ -0,0 +1,60 @@ +package org.thingsboard.server.queue.kafka; + +import com.microsoft.aad.adal4j.AsymmetricKeyCredential; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.*; + +@Slf4j +class TbKafkaAdminTest { + + + Properties props; + AdminClient admin; + @BeforeEach + void setUp() { + props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + admin = AdminClient.create(props); + } + + @AfterEach + void tearDown() { + admin.close(); + } + + @Test + void testListOffsets() throws ExecutionException, InterruptedException { + log.info("Getting consumer groups list..."); + Collection consumerGroupListings = admin.listConsumerGroups().all().get(); + consumerGroupListings = consumerGroupListings.stream().sorted(Comparator.comparing(ConsumerGroupListing::groupId)).toList(); + for (ConsumerGroupListing consumerGroup : consumerGroupListings) { + String groupId = consumerGroup.groupId(); + log.info("=== consumer group: {}", groupId); + Map consumerOffsets = admin.listConsumerGroupOffsets(groupId) + .partitionsToOffsetAndMetadata().get(); + + // Printing the fetched offsets + consumerOffsets.forEach((tp, om) ->log.info(tp.topic() + " partition " + tp.partition() + " offset " + om.offset())); + } + } + +} \ No newline at end of file