From 258c26ed0449601fe21b9179772317f79f62d197 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 15 May 2024 16:23:20 +0200 Subject: [PATCH] Kafka groupId syncOffsets from a fat group to a single-partition group --- .../server/queue/discovery/TopicService.java | 10 ++- .../server/queue/kafka/TbKafkaAdmin.java | 61 +++++++++++++------ .../provider/KafkaMonolithQueueFactory.java | 12 ++-- .../KafkaTbRuleEngineQueueFactory.java | 12 ++-- .../server/queue/kafka/TbKafkaAdminTest.java | 2 +- 5 files changed, 68 insertions(+), 29 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index b7e000197e..960be231c6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -65,7 +65,15 @@ public class TopicService { return prefix.isBlank() ? topic : prefix + "." + topic; } - public String suffix(Integer partitionId) { + public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) { + return this.buildTopicName( + servicePrefix + queueName + + (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId)) + + "-consumer" + + suffix(partitionId)); + } + + String suffix(Integer partitionId) { return partitionId == null ? "" : "-" + partitionId; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 65cc28405a..b818cd1234 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.kafka; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; @@ -40,6 +41,7 @@ import java.util.concurrent.TimeoutException; @Slf4j public class TbKafkaAdmin implements TbQueueAdmin { + @Getter private final AdminClient client; private final Map topicConfigs; private final Set topics = ConcurrentHashMap.newKeySet(); @@ -118,27 +120,52 @@ public class TbKafkaAdmin implements TbQueueAdmin { return client.createTopics(Collections.singletonList(topic)); } - public void syncOffsets(String oldGroupId, String newGroupId) throws ExecutionException, InterruptedException, TimeoutException { - ListConsumerGroupOffsetsResult fatOffsets = client.listConsumerGroupOffsets("id1"); - Map oldOffsets = new ConcurrentHashMap<>(); - client.listConsumerGroupOffsets(oldGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { - if (err != null) { - log.warn("Failed to list consumer group offsets [{}]", oldGroupId, err); - } else { - oldOffsets.putAll(res); + /** + * Sync offsets from a fat group to a single-partition group + * Migration back from single-partition consumer to a fat group is not supported + * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers + * */ + public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { + try { + syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId); + } catch (Exception e) { + log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); + } + } + + void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException { + log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); + if (partitionId == null) { + return; + } + Map oldOffsets = + client.listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + if (oldOffsets.isEmpty()) { + return; + } + + for (var consumerOffset : oldOffsets.entrySet()) { + var tp = consumerOffset.getKey(); + if (!tp.topic().endsWith("." + partitionId)) { + continue; } - }).get(10, TimeUnit.SECONDS); + var om = consumerOffset.getValue(); + Map newOffsets = + client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); - Map newOffsets = new ConcurrentHashMap<>(); - client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { - if (err != null) { - log.warn("Failed to list consumer group offsets [{}]", newGroupId, err); + var existingOffset = newOffsets.get(tp); + if (existingOffset == null) { + log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets); + } else if (existingOffset.offset() >= om.offset()) { + log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset()); + break; } else { - newOffsets.putAll(res); + log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); } - }).get(10, TimeUnit.SECONDS); - - + client.alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS); + log.info("[{}] altered new consumer groupId {}", tp, newGroupId); + break; + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 73c62e33dc..30dac37851 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -75,7 +75,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; - private final TbQueueAdmin ruleEngineAdmin; + private final TbKafkaAdmin ruleEngineAdmin; private final TbQueueAdmin jsExecutorRequestAdmin; private final TbQueueAdmin jsExecutorResponseAdmin; private final TbQueueAdmin transportApiRequestAdmin; @@ -193,14 +193,16 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); + + ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName - + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) - + "-consumer" - + topicService.suffix(partitionId))); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 36238c206c..31fa7efffb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -68,7 +68,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueAdmin coreAdmin; - private final TbQueueAdmin ruleEngineAdmin; + private final TbKafkaAdmin ruleEngineAdmin; private final TbQueueAdmin jsExecutorRequestAdmin; private final TbQueueAdmin jsExecutorResponseAdmin; private final TbQueueAdmin notificationAdmin; @@ -170,14 +170,16 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); + + ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName - + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) - + "-consumer" - + topicService.suffix(partitionId))); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java index 6a0670cd31..2007747dc9 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java @@ -82,7 +82,7 @@ class TbKafkaAdminTest { Map newConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId) .partitionsToOffsetAndMetadata().get(); - if (newConsumerOffsets.isEmpty()) { + if (!newConsumerOffsets.isEmpty()) { log.info("Found existing new group ConsumerOffsets {}", newConsumerOffsets); }