diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fbef0cfd29..ae8ea8f7f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -204,7 +204,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService queueFactory.createToCoreMsgConsumer()) + .consumerCreator((config, partitionId) -> queueFactory.createToCoreMsgConsumer()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) .taskExecutor(mgmtExecutor) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java index 6792422b57..6eb5c94c9b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java @@ -41,7 +41,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; @Slf4j @@ -51,7 +51,7 @@ public class MainQueueConsumerManager msgPackProcessor; - protected final Function> consumerCreator; + protected final BiFunction> consumerCreator; protected final ExecutorService consumerExecutor; protected final ScheduledExecutorService scheduler; protected final ExecutorService taskExecutor; @@ -67,7 +67,7 @@ public class MainQueueConsumerManager msgPackProcessor, - Function> consumerCreator, + BiFunction> consumerCreator, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, ExecutorService taskExecutor) { @@ -273,8 +273,9 @@ public class MainQueueConsumerManager consumers.remove(tpi).awaitCompletion()); addedPartitions.forEach((tpi) -> { - String key = queueKey + "-" + tpi.getPartition().orElse(-1); - TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, consumerCreator.apply(config)); + Integer partitionId = tpi.getPartition().orElse(-1); + String key = queueKey + "-" + partitionId; + TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId)); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); @@ -303,7 +304,7 @@ public class MainQueueConsumerManager(queueKey, consumerCreator.apply(config)); + consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null)); // no partitionId passed } consumer.subscribe(partitions); if (!consumer.isRunning()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java index 84539d4d7c..5e672eb5c6 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java @@ -16,37 +16,57 @@ package org.thingsboard.server.service.queue.ruleengine; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; -@RequiredArgsConstructor @Slf4j public class TbQueueConsumerTask { @Getter private final Object key; - @Getter - private final TbQueueConsumer consumer; + private volatile TbQueueConsumer consumer; + private volatile Supplier> consumerSupplier; @Setter private Future task; + public TbQueueConsumerTask(Object key, Supplier> consumerSupplier) { + this.key = key; + this.consumer = null; + this.consumerSupplier = consumerSupplier; + } + + public TbQueueConsumer getConsumer() { + if (consumer == null) { + synchronized (this) { + if (consumer == null) { + Objects.requireNonNull(consumerSupplier, "consumerSupplier for key [" + key + "] is null"); + consumer = consumerSupplier.get(); + Objects.requireNonNull(consumer, "consumer for key [" + key + "] is null"); + consumerSupplier = null; + } + } + } + return consumer; + } + public void subscribe(Set partitions) { log.trace("[{}] Subscribing to partitions: {}", key, partitions); - consumer.subscribe(partitions); + getConsumer().subscribe(partitions); } public void initiateStop() { log.debug("[{}] Initiating stop", key); - consumer.stop(); + getConsumer().stop(); } public void awaitCompletion() { diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 4d4d59a407..fdf2e203ee 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -21,10 +21,10 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; @@ -55,6 +55,8 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.provider.KafkaMonolithQueueFactory; +import org.thingsboard.server.queue.provider.KafkaTbRuleEngineQueueFactory; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; @@ -79,10 +81,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willCallRealMethod; import static org.mockito.Mockito.after; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; @@ -114,6 +118,7 @@ public class TbRuleEngineQueueConsumerManagerTest { private PartitionService partitionService; @Mock private TbQueueProducerProvider producerProvider; + @Mock private TbQueueProducer> ruleEngineMsgProducer; @Mock private TbQueueAdmin queueAdmin; @@ -148,7 +153,7 @@ public class TbRuleEngineQueueConsumerManagerTest { log.trace("totalProcessedMsgs = {}", totalProcessedMsgs); return null; }).when(actorContext).tell(any()); - ruleEngineMsgProducer = mock(TbQueueProducer.class); + when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer); consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer")); mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(3, "tb-rule-engine-mgmt"); @@ -180,7 +185,7 @@ public class TbRuleEngineQueueConsumerManagerTest { } consumers.add(consumer); return consumer; - }).when(queueFactory).createToRuleEngineMsgConsumer(any()); + }).when(queueFactory).createToRuleEngineMsgConsumer(any(), any()); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); consumerManager = TbRuleEngineQueueConsumerManager.create() @@ -210,6 +215,20 @@ public class TbRuleEngineQueueConsumerManagerTest { } } + @ParameterizedTest + @ValueSource(classes = {KafkaMonolithQueueFactory.class, KafkaTbRuleEngineQueueFactory.class}) + public void testUnsupported_createToRuleEngineMsgConsumer_KafkaTbRuleEngineQueueFactory(Class factoryClass) { + // obsolete, but need to pass the afterEach + queue.setConsumerPerPartition(false); + consumerManager.init(queue); + + var factory = mock(factoryClass); + willCallRealMethod().given(factory).createToRuleEngineMsgConsumer(any()); + assertThatThrownBy(() -> factory.createToRuleEngineMsgConsumer(mock(Queue.class))) + .isInstanceOf(UnsupportedOperationException.class); + + } + @Test public void testInit_consumerPerPartition() { queue.setConsumerPerPartition(true); @@ -247,7 +266,8 @@ public class TbRuleEngineQueueConsumerManagerTest { Set partitions = Collections.emptySet(); consumerManager.update(partitions); - verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any()); + verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any()); + verify(queueFactory, never()).createToRuleEngineMsgConsumer(any()); partitions = createTpis(1); consumerManager.update(partitions); @@ -278,7 +298,8 @@ public class TbRuleEngineQueueConsumerManagerTest { consumerManager.init(queue); consumerManager.update(Collections.emptySet()); - verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any()); + verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any()); + verify(queueFactory, never()).createToRuleEngineMsgConsumer(any()); consumerManager.update(createTpis(1)); TestConsumer consumer1 = getConsumer(1); @@ -420,7 +441,8 @@ public class TbRuleEngineQueueConsumerManagerTest { consumerManager.update(createTpis(1)); TestConsumer consumer = getConsumer(1); verifySubscribedAndLaunched(consumer, 1); - verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any()); + verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any(), any()); + verify(queueFactory, never()).createToRuleEngineMsgConsumer(any()); consumerManager.stop(); consumerManager.update(createTpis(1, 2, 3, 4)); // to check that no new tasks after stop are processed diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 46084f8201..960be231c6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -64,4 +64,17 @@ public class TopicService { public String buildTopicName(String topic) { return prefix.isBlank() ? topic : prefix + "." + topic; } -} \ No newline at end of file + + public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) { + return this.buildTopicName( + servicePrefix + queueName + + (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId)) + + "-consumer" + + suffix(partitionId)); + } + + String suffix(Integer partitionId) { + return partitionId == null ? "" : "-" + partitionId; + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index a5a0427606..0cb1194167 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -15,9 +15,13 @@ */ package org.thingsboard.server.queue.kafka; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; @@ -27,6 +31,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Created by ashvayka on 24.09.18. @@ -120,4 +126,53 @@ public class TbKafkaAdmin implements TbQueueAdmin { public void destroy() { } + /** + * Sync offsets from a fat group to a single-partition group + * Migration back from single-partition consumer to a fat group is not supported + * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers + * */ + public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { + try { + syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId); + } catch (Exception e) { + log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); + } + } + + void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException { + log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); + if (partitionId == null) { + return; + } + Map oldOffsets = + settings.getAdminClient().listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + if (oldOffsets.isEmpty()) { + return; + } + + for (var consumerOffset : oldOffsets.entrySet()) { + var tp = consumerOffset.getKey(); + if (!tp.topic().endsWith("." + partitionId)) { + continue; + } + var om = consumerOffset.getValue(); + Map newOffsets = + settings.getAdminClient().listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + + var existingOffset = newOffsets.get(tp); + if (existingOffset == null) { + log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets); + } else if (existingOffset.offset() >= om.offset()) { + log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset()); + break; + } else { + log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); + } + settings.getAdminClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS); + log.info("[{}] altered new consumer groupId {}", tp, newGroupId); + break; + } + + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index d8efad2182..9728642b8d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -75,7 +75,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; - private final TbQueueAdmin ruleEngineAdmin; + private final TbKafkaAdmin ruleEngineAdmin; private final TbQueueAdmin jsExecutorRequestAdmin; private final TbQueueAdmin jsExecutorResponseAdmin; private final TbQueueAdmin transportApiRequestAdmin; @@ -187,18 +187,29 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration) { + throw new UnsupportedOperationException("Rule engine consumer should use a partitionId"); + } + + @Override + public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); + + ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + "-consumer")); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } + @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 568380d7b5..31fa7efffb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -68,7 +68,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueAdmin coreAdmin; - private final TbQueueAdmin ruleEngineAdmin; + private final TbKafkaAdmin ruleEngineAdmin; private final TbQueueAdmin jsExecutorRequestAdmin; private final TbQueueAdmin jsExecutorResponseAdmin; private final TbQueueAdmin notificationAdmin; @@ -164,12 +164,22 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration) { + throw new UnsupportedOperationException("Rule engine consumer should use a partitionId"); + } + + @Override + public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); + + ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + "-consumer")); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index f23c7e47f3..e49074f92d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -77,13 +77,25 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueProducer> createToOtaPackageStateServiceMsgProducer(); /** - * Used to consume messages by TB Core Service + * Used to consume messages by TB Rule Engine Service * * @return * @param configuration */ TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration); + /** + * Used to consume messages by TB Rule Engine Service + * Intended usage for consumer per partition strategy + * + * @return TbQueueConsumer + * @param configuration + * @param partitionId as a suffix for consumer name + */ + default TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { + return createToRuleEngineMsgConsumer(configuration); + } + /** * Used to consume high priority messages by TB Core Service * diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java new file mode 100644 index 0000000000..2007747dc9 --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java @@ -0,0 +1,148 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +@Slf4j +class TbKafkaAdminTest { + + Properties props; + AdminClient admin; + + @BeforeEach + void setUp() { + props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + admin = AdminClient.create(props); + } + + @AfterEach + void tearDown() { + admin.close(); + } + + @Disabled + @Test + void testListOffsets() throws ExecutionException, InterruptedException { + log.info("Getting consumer groups list..."); + Collection consumerGroupListings = admin.listConsumerGroups().all().get(); + consumerGroupListings = consumerGroupListings.stream().sorted(Comparator.comparing(ConsumerGroupListing::groupId)).toList(); + for (ConsumerGroupListing consumerGroup : consumerGroupListings) { + String groupId = consumerGroup.groupId(); + log.info("=== consumer group: {}", groupId); + Map consumerOffsets = admin.listConsumerGroupOffsets(groupId) + .partitionsToOffsetAndMetadata().get(); + + // Printing the fetched offsets + consumerOffsets.forEach((tp, om) -> log.info(tp.topic() + " partition " + tp.partition() + " offset " + om.offset())); + if (groupId.startsWith("re-") && groupId.endsWith("consumer")) { + log.info("****** Migrating groupId [{}] ...", groupId); + + for (var consumerOffset : consumerOffsets.entrySet()) { + var tp = consumerOffset.getKey(); + var om = consumerOffset.getValue(); + final Integer tbPartitionId = parsePartitionIdFromTopicName(tp.topic()); + if (tbPartitionId == null || tbPartitionId < 0) { + continue; + } + + String newGroupId = groupId + "-" + tbPartitionId; + log.info("Getting offsets for consumer groupId [{}]", newGroupId); + Map newConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId) + .partitionsToOffsetAndMetadata().get(); + + if (!newConsumerOffsets.isEmpty()) { + log.info("Found existing new group ConsumerOffsets {}", newConsumerOffsets); + } + + var existingOffset = newConsumerOffsets.get(tp); + if (existingOffset == null) { + log.info("topic offset does not exists in the new node group, all found offsets"); + } else if (existingOffset.offset() >= om.offset()) { + log.info("topic offset {} >= than old node group offset {}", existingOffset.offset(), om.offset()); + continue; + } else { + log.info("SHOULD alter topic offset [{}] less than old node group offset [{}]", existingOffset.offset(), om.offset()); + } + + Map newOffsets = Map.of(tp, om); + + log.warn("@@@@@ alterConsumerGroupOffsets [{}] with new offsets [{}]", newGroupId, newOffsets); + admin.alterConsumerGroupOffsets(newGroupId, newOffsets).all().whenComplete((res, err) -> { + if (err != null) { + log.error("Failed to alterConsumerGroupOffsets for groupId [{}], new offsets [{}]", newGroupId, newOffsets, err); + } else { + log.info("Updated new consumer group [{}], offsets [{}]", newGroupId, newOffsets); + } + }).get(); // Handle asynchronously as appropriate + + //Verify + + Map resultedConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId) + .partitionsToOffsetAndMetadata().get(); + + MapDifference diff = Maps.difference(newOffsets, resultedConsumerOffsets); + + if (!diff.areEqual()) { + log.error("Verify failed for groupId [{}], current offset {} is not the same as expected {}", newGroupId, resultedConsumerOffsets, newOffsets); + } else { + log.info("Verify passed for groupId [{}]", newGroupId); + } + + } + + } + } + + } + + Integer parsePartitionIdFromTopicName(String topic) { + if (topic == null) { + return null; + } + int dotIndex = topic.lastIndexOf('.'); + if (dotIndex <= 0) { + return null; + } + + String indexStr = topic.substring(dotIndex + 1); + try { + return Integer.parseInt(indexStr); + } catch (Throwable t) { + log.warn("Can't parse partition Id from topic name [{}]", topic, t); + } + return null; + } + +}