Kafka groupId syncOffsets from a fat group to a single-partition group

This commit is contained in:
Sergey Matvienko 2024-05-15 16:23:20 +02:00
parent 34cfd588b1
commit 258c26ed04
5 changed files with 68 additions and 29 deletions

View File

@ -65,7 +65,15 @@ public class TopicService {
return prefix.isBlank() ? topic : prefix + "." + topic;
}
public String suffix(Integer partitionId) {
public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) {
return this.buildTopicName(
servicePrefix + queueName
+ (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId))
+ "-consumer"
+ suffix(partitionId));
}
String suffix(Integer partitionId) {
return partitionId == null ? "" : "-" + partitionId;
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.queue.kafka;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
@ -40,6 +41,7 @@ import java.util.concurrent.TimeoutException;
@Slf4j
public class TbKafkaAdmin implements TbQueueAdmin {
@Getter
private final AdminClient client;
private final Map<String, String> topicConfigs;
private final Set<String> topics = ConcurrentHashMap.newKeySet();
@ -118,27 +120,52 @@ public class TbKafkaAdmin implements TbQueueAdmin {
return client.createTopics(Collections.singletonList(topic));
}
public void syncOffsets(String oldGroupId, String newGroupId) throws ExecutionException, InterruptedException, TimeoutException {
ListConsumerGroupOffsetsResult fatOffsets = client.listConsumerGroupOffsets("id1");
Map<TopicPartition, OffsetAndMetadata> oldOffsets = new ConcurrentHashMap<>();
client.listConsumerGroupOffsets(oldGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> {
if (err != null) {
log.warn("Failed to list consumer group offsets [{}]", oldGroupId, err);
} else {
oldOffsets.putAll(res);
/**
* Sync offsets from a fat group to a single-partition group
* Migration back from single-partition consumer to a fat group is not supported
* TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers
* */
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
try {
syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId);
} catch (Exception e) {
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
}
}
void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException {
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
if (partitionId == null) {
return;
}
Map<TopicPartition, OffsetAndMetadata> oldOffsets =
client.listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
if (oldOffsets.isEmpty()) {
return;
}
for (var consumerOffset : oldOffsets.entrySet()) {
var tp = consumerOffset.getKey();
if (!tp.topic().endsWith("." + partitionId)) {
continue;
}
}).get(10, TimeUnit.SECONDS);
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets =
client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, OffsetAndMetadata> newOffsets = new ConcurrentHashMap<>();
client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> {
if (err != null) {
log.warn("Failed to list consumer group offsets [{}]", newGroupId, err);
var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets);
} else if (existingOffset.offset() >= om.offset()) {
log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset());
break;
} else {
newOffsets.putAll(res);
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
}
}).get(10, TimeUnit.SECONDS);
client.alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS);
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
break;
}
}

View File

@ -75,7 +75,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorRequestAdmin;
private final TbQueueAdmin jsExecutorResponseAdmin;
private final TbQueueAdmin transportApiRequestAdmin;
@ -193,14 +193,16 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) {
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic()));
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName
+ (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId()))
+ "-consumer"
+ topicService.suffix(partitionId)));
consumerBuilder.groupId(groupId);
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

View File

@ -68,7 +68,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorRequestAdmin;
private final TbQueueAdmin jsExecutorResponseAdmin;
private final TbQueueAdmin notificationAdmin;
@ -170,14 +170,16 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) {
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic()));
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName
+ (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId()))
+ "-consumer"
+ topicService.suffix(partitionId)));
consumerBuilder.groupId(groupId);
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

View File

@ -82,7 +82,7 @@ class TbKafkaAdminTest {
Map<TopicPartition, OffsetAndMetadata> newConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId)
.partitionsToOffsetAndMetadata().get();
if (newConsumerOffsets.isEmpty()) {
if (!newConsumerOffsets.isEmpty()) {
log.info("Found existing new group ConsumerOffsets {}", newConsumerOffsets);
}