TbKafkaAdmin WIP

This commit is contained in:
Sergey Matvienko 2024-05-08 13:58:21 +02:00
parent e638e60340
commit 07de3b975f
3 changed files with 94 additions and 1 deletions

View File

@ -437,10 +437,12 @@ public class TbRuleEngineQueueConsumerManager {
consumers.remove(tpi).awaitCompletion(); consumers.remove(tpi).awaitCompletion();
}); });
// ctx.getQueueAdmin().createTopicIfNotExists();
addedPartitions.forEach((tpi) -> { addedPartitions.forEach((tpi) -> {
int partitionId = tpi.getPartition().orElse(-999999); int partitionId = tpi.getPartition().orElse(-999999);
String key = queueKey + "-" + partitionId; String key = queueKey + "-" + partitionId;
TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, partitionId)); TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null));
consumers.put(tpi, consumer); consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi)); consumer.subscribe(Set.of(tpi));
launchConsumer(consumer); launchConsumer(consumer);

View File

@ -18,7 +18,10 @@ package org.thingsboard.server.queue.kafka;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.TopicExistsException;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils; import org.thingsboard.server.queue.util.PropertyUtils;
@ -28,6 +31,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* Created by ashvayka on 24.09.18. * Created by ashvayka on 24.09.18.
@ -109,6 +114,32 @@ public class TbKafkaAdmin implements TbQueueAdmin {
} }
public CreateTopicsResult createTopic(NewTopic topic) { public CreateTopicsResult createTopic(NewTopic topic) {
client.listConsumerGroupOffsets("id1");
return client.createTopics(Collections.singletonList(topic)); return client.createTopics(Collections.singletonList(topic));
} }
public void syncOffsets(String oldGroupId, String newGroupId) throws ExecutionException, InterruptedException, TimeoutException {
ListConsumerGroupOffsetsResult fatOffsets = client.listConsumerGroupOffsets("id1");
Map<TopicPartition, OffsetAndMetadata> oldOffsets = new ConcurrentHashMap<>();
client.listConsumerGroupOffsets(oldGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> {
if (err != null) {
log.warn("Failed to list consumer group offsets [{}]", oldGroupId, err);
} else {
oldOffsets.putAll(res);
}
}).get(10, TimeUnit.SECONDS);
Map<TopicPartition, OffsetAndMetadata> newOffsets = new ConcurrentHashMap<>();
client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> {
if (err != null) {
log.warn("Failed to list consumer group offsets [{}]", newGroupId, err);
} else {
newOffsets.putAll(res);
}
}).get(10, TimeUnit.SECONDS);
}
} }

View File

@ -0,0 +1,60 @@
package org.thingsboard.server.queue.kafka;
import com.microsoft.aad.adal4j.AsymmetricKeyCredential;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.jupiter.api.Assertions.*;
@Slf4j
class TbKafkaAdminTest {
Properties props;
AdminClient admin;
@BeforeEach
void setUp() {
props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
admin = AdminClient.create(props);
}
@AfterEach
void tearDown() {
admin.close();
}
@Test
void testListOffsets() throws ExecutionException, InterruptedException {
log.info("Getting consumer groups list...");
Collection<ConsumerGroupListing> consumerGroupListings = admin.listConsumerGroups().all().get();
consumerGroupListings = consumerGroupListings.stream().sorted(Comparator.comparing(ConsumerGroupListing::groupId)).toList();
for (ConsumerGroupListing consumerGroup : consumerGroupListings) {
String groupId = consumerGroup.groupId();
log.info("=== consumer group: {}", groupId);
Map<TopicPartition, OffsetAndMetadata> consumerOffsets = admin.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get();
// Printing the fetched offsets
consumerOffsets.forEach((tp, om) ->log.info(tp.topic() + " partition " + tp.partition() + " offset " + om.offset()));
}
}
}