From 8519387ca7234e29175f7eb8750a7804adb22ff1 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 7 May 2024 00:14:15 +0200 Subject: [PATCH 01/11] Rule engine: Kafka consumer group per partition --- .../TbRuleEngineQueueConsumerManager.java | 7 ++++--- .../server/queue/discovery/TopicService.java | 7 ++++++- .../queue/provider/KafkaMonolithQueueFactory.java | 11 ++++++++++- .../provider/KafkaTbRuleEngineQueueFactory.java | 10 +++++++++- .../queue/provider/TbRuleEngineQueueFactory.java | 14 +++++++++++++- 5 files changed, 42 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 2da3dbc6dc..38e54e8873 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -438,8 +438,9 @@ public class TbRuleEngineQueueConsumerManager { }); addedPartitions.forEach((tpi) -> { - String key = queueKey + "-" + tpi.getPartition().orElse(-999999); - TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue)); + int partitionId = tpi.getPartition().orElse(-999999); + String key = queueKey + "-" + partitionId; + TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, partitionId)); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); @@ -468,7 +469,7 @@ public class TbRuleEngineQueueConsumerManager { } if (consumer == null) { - consumer = new TbQueueConsumerTask(queueKey, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue)); + consumer = new TbQueueConsumerTask(queueKey, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null)); } consumer.subscribe(partitions); if (!consumer.isRunning()) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 46084f8201..b7e000197e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -64,4 +64,9 @@ public class TopicService { public String buildTopicName(String topic) { return prefix.isBlank() ? topic : prefix + "." + topic; } -} \ No newline at end of file + + public String suffix(Integer partitionId) { + return partitionId == null ? "" : "-" + partitionId; + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index cd09f682c6..73c62e33dc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -187,18 +187,27 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration) { + throw new UnsupportedOperationException("Rule engine consumer should use a partitionId"); + } + + @Override + public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + "-consumer")); + consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + + "-consumer" + + topicService.suffix(partitionId))); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } + @Override public TbQueueConsumer> createToRuleEngineNotificationsMsgConsumer() { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 568380d7b5..36238c206c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -164,12 +164,20 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration) { + throw new UnsupportedOperationException("Rule engine consumer should use a partitionId"); + } + + @Override + public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + "-consumer")); + consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + + "-consumer" + + topicService.suffix(partitionId))); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index f23c7e47f3..e49074f92d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -77,13 +77,25 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueProducer> createToOtaPackageStateServiceMsgProducer(); /** - * Used to consume messages by TB Core Service + * Used to consume messages by TB Rule Engine Service * * @return * @param configuration */ TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration); + /** + * Used to consume messages by TB Rule Engine Service + * Intended usage for consumer per partition strategy + * + * @return TbQueueConsumer + * @param configuration + * @param partitionId as a suffix for consumer name + */ + default TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { + return createToRuleEngineMsgConsumer(configuration); + } + /** * Used to consume high priority messages by TB Core Service * From e638e60340bc76305420e14b87599ad3278472ee Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 7 May 2024 12:20:35 +0200 Subject: [PATCH 02/11] TbRuleEngineQueueConsumerManagerTest fixed, bumped to JUnit5, parametrized test added --- .../TbRuleEngineQueueConsumerManagerTest.java | 50 ++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 8e0b294241..33b79cc82c 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -18,13 +18,15 @@ package org.thingsboard.server.service.queue.ruleengine; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.ActorSystemContext; @@ -51,6 +53,8 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.provider.KafkaMonolithQueueFactory; +import org.thingsboard.server.queue.provider.KafkaTbRuleEngineQueueFactory; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; @@ -72,10 +76,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willCallRealMethod; import static org.mockito.Mockito.after; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; @@ -91,7 +97,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @Slf4j -@RunWith(MockitoJUnitRunner.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class TbRuleEngineQueueConsumerManagerTest { @Mock @@ -108,6 +114,7 @@ public class TbRuleEngineQueueConsumerManagerTest { private PartitionService partitionService; @Mock private TbQueueProducerProvider producerProvider; + @Mock private TbQueueProducer> ruleEngineMsgProducer; @Mock private TbQueueAdmin queueAdmin; @@ -121,7 +128,7 @@ public class TbRuleEngineQueueConsumerManagerTest { private AtomicInteger totalConsumedMsgs; private AtomicInteger totalProcessedMsgs; - @Before + @BeforeEach public void beforeEach() { ruleEngineConsumerContext = new TbRuleEngineConsumerContext( actorContext, statsFactory, spy(new TbRuleEngineSubmitStrategyFactory()), @@ -139,7 +146,7 @@ public class TbRuleEngineQueueConsumerManagerTest { log.trace("totalProcessedMsgs = {}", totalProcessedMsgs); return null; }).when(actorContext).tell(any()); - ruleEngineMsgProducer = mock(TbQueueProducer.class); + when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer); ruleEngineConsumerContext.setMgmtThreadPoolSize(2); ruleEngineConsumerContext.setTopicDeletionDelayInSec(5); @@ -171,13 +178,13 @@ public class TbRuleEngineQueueConsumerManagerTest { } consumers.add(consumer); return consumer; - }).when(queueFactory).createToRuleEngineMsgConsumer(any()); + }).when(queueFactory).createToRuleEngineMsgConsumer(any(), any()); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); consumerManager = new TbRuleEngineQueueConsumerManager(ruleEngineConsumerContext, queueKey); } - @After + @AfterEach public void afterEach() { consumerManager.stop(); consumerManager.awaitStop(); @@ -192,6 +199,20 @@ public class TbRuleEngineQueueConsumerManagerTest { } } + @ParameterizedTest + @ValueSource(classes = {KafkaMonolithQueueFactory.class, KafkaTbRuleEngineQueueFactory.class}) + public void testUnsupported_createToRuleEngineMsgConsumer_KafkaTbRuleEngineQueueFactory(Class factoryClass) { + // obsolete, but need to pass the afterEach + queue.setConsumerPerPartition(false); + consumerManager.init(queue); + + var factory = mock(factoryClass); + willCallRealMethod().given(factory).createToRuleEngineMsgConsumer(any()); + assertThatThrownBy(() -> factory.createToRuleEngineMsgConsumer(mock(Queue.class))) + .isInstanceOf(UnsupportedOperationException.class); + + } + @Test public void testInit_consumerPerPartition() { queue.setConsumerPerPartition(true); @@ -244,7 +265,8 @@ public class TbRuleEngineQueueConsumerManagerTest { Set partitions = Collections.emptySet(); consumerManager.update(partitions); - verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any()); + verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any()); + verify(queueFactory, never()).createToRuleEngineMsgConsumer(any()); partitions = createTpis(1); consumerManager.update(partitions); @@ -276,7 +298,8 @@ public class TbRuleEngineQueueConsumerManagerTest { ruleEngineConsumerContext.setReady(true); consumerManager.update(Collections.emptySet()); - verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any()); + verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any()); + verify(queueFactory, never()).createToRuleEngineMsgConsumer(any()); consumerManager.update(createTpis(1)); TestConsumer consumer1 = getConsumer(1); @@ -423,7 +446,8 @@ public class TbRuleEngineQueueConsumerManagerTest { consumerManager.update(createTpis(1)); TestConsumer consumer = getConsumer(1); verifySubscribedAndLaunched(consumer, 1); - verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any()); + verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any(), any()); + verify(queueFactory, never()).createToRuleEngineMsgConsumer(any()); consumerManager.stop(); consumerManager.update(createTpis(1, 2, 3, 4)); // to check that no new tasks after stop are processed From 07de3b975f5aa1670e1b0ccdf6dbb79c250ede8d Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 8 May 2024 13:58:21 +0200 Subject: [PATCH 03/11] TbKafkaAdmin WIP --- .../TbRuleEngineQueueConsumerManager.java | 4 +- .../server/queue/kafka/TbKafkaAdmin.java | 31 ++++++++++ .../server/queue/kafka/TbKafkaAdminTest.java | 60 +++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 38e54e8873..39d91341f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -437,10 +437,12 @@ public class TbRuleEngineQueueConsumerManager { consumers.remove(tpi).awaitCompletion(); }); + +// ctx.getQueueAdmin().createTopicIfNotExists(); addedPartitions.forEach((tpi) -> { int partitionId = tpi.getPartition().orElse(-999999); String key = queueKey + "-" + partitionId; - TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, partitionId)); + TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null)); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index bf5555817d..65cc28405a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -18,7 +18,10 @@ package org.thingsboard.server.queue.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; @@ -28,6 +31,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Created by ashvayka on 24.09.18. @@ -109,6 +114,32 @@ public class TbKafkaAdmin implements TbQueueAdmin { } public CreateTopicsResult createTopic(NewTopic topic) { + client.listConsumerGroupOffsets("id1"); return client.createTopics(Collections.singletonList(topic)); } + + public void syncOffsets(String oldGroupId, String newGroupId) throws ExecutionException, InterruptedException, TimeoutException { + ListConsumerGroupOffsetsResult fatOffsets = client.listConsumerGroupOffsets("id1"); + Map oldOffsets = new ConcurrentHashMap<>(); + client.listConsumerGroupOffsets(oldGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { + if (err != null) { + log.warn("Failed to list consumer group offsets [{}]", oldGroupId, err); + } else { + oldOffsets.putAll(res); + } + }).get(10, TimeUnit.SECONDS); + + Map newOffsets = new ConcurrentHashMap<>(); + client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { + if (err != null) { + log.warn("Failed to list consumer group offsets [{}]", newGroupId, err); + } else { + newOffsets.putAll(res); + } + }).get(10, TimeUnit.SECONDS); + + + + } + } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java new file mode 100644 index 0000000000..429419363d --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java @@ -0,0 +1,60 @@ +package org.thingsboard.server.queue.kafka; + +import com.microsoft.aad.adal4j.AsymmetricKeyCredential; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.*; + +@Slf4j +class TbKafkaAdminTest { + + + Properties props; + AdminClient admin; + @BeforeEach + void setUp() { + props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + admin = AdminClient.create(props); + } + + @AfterEach + void tearDown() { + admin.close(); + } + + @Test + void testListOffsets() throws ExecutionException, InterruptedException { + log.info("Getting consumer groups list..."); + Collection consumerGroupListings = admin.listConsumerGroups().all().get(); + consumerGroupListings = consumerGroupListings.stream().sorted(Comparator.comparing(ConsumerGroupListing::groupId)).toList(); + for (ConsumerGroupListing consumerGroup : consumerGroupListings) { + String groupId = consumerGroup.groupId(); + log.info("=== consumer group: {}", groupId); + Map consumerOffsets = admin.listConsumerGroupOffsets(groupId) + .partitionsToOffsetAndMetadata().get(); + + // Printing the fetched offsets + consumerOffsets.forEach((tp, om) ->log.info(tp.topic() + " partition " + tp.partition() + " offset " + om.offset())); + } + } + +} \ No newline at end of file From 416439fab08bd59997bfa93a6457fe7862575cee Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 13 May 2024 17:19:58 +0200 Subject: [PATCH 04/11] TbRuleEngineQueueConsumerManager - createToRuleEngineMsgConsumer lazy --- .../queue/ruleengine/TbQueueConsumerTask.java | 27 ++++++++++++++++--- .../TbRuleEngineQueueConsumerManager.java | 7 ++--- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java index 0e25efd014..a25f74a5b1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java @@ -24,22 +24,43 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; -@RequiredArgsConstructor @Slf4j public class TbQueueConsumerTask { @Getter private final Object key; - @Getter - private final TbQueueConsumer> consumer; + private volatile TbQueueConsumer> consumer; + private volatile Supplier>> consumerSupplier; @Setter private Future task; + public TbQueueConsumerTask(Object key, Supplier>> consumerSupplier) { + this.key = key; + this.consumer = null; + this.consumerSupplier = consumerSupplier; + } + + public TbQueueConsumer> getConsumer() { + if (consumer == null) { + synchronized (this) { + if (consumer == null) { + Objects.requireNonNull(consumerSupplier, "consumerSupplier for key [" + key + "] is null"); + consumer = consumerSupplier.get(); + Objects.requireNonNull(consumer, "consumer for key [" + key + "] is null"); + consumerSupplier = null; + } + } + } + return consumer; + } + public void subscribe(Set partitions) { log.trace("[{}] Subscribing to partitions: {}", key, partitions); consumer.subscribe(partitions); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 39d91341f0..4df0a900d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -433,16 +433,13 @@ public class TbRuleEngineQueueConsumerManager { removedPartitions.forEach((tpi) -> { consumers.get(tpi).initiateStop(); }); - removedPartitions.forEach((tpi) -> { - consumers.remove(tpi).awaitCompletion(); - }); // ctx.getQueueAdmin().createTopicIfNotExists(); addedPartitions.forEach((tpi) -> { int partitionId = tpi.getPartition().orElse(-999999); String key = queueKey + "-" + partitionId; - TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null)); + TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, () -> ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, partitionId)); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); @@ -471,7 +468,7 @@ public class TbRuleEngineQueueConsumerManager { } if (consumer == null) { - consumer = new TbQueueConsumerTask(queueKey, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null)); + consumer = new TbQueueConsumerTask(queueKey, () -> ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null)); } consumer.subscribe(partitions); if (!consumer.isRunning()) { From 5d7dc19452dc95a1332d7955e369f3b38ed174ec Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 13 May 2024 17:21:36 +0200 Subject: [PATCH 05/11] TbQueueConsumerTask::awaitCompletion after launchConsumer to provide a fully async repartition experience --- .../queue/ruleengine/TbRuleEngineQueueConsumerManager.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 4df0a900d2..0fb005b928 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -434,6 +434,8 @@ public class TbRuleEngineQueueConsumerManager { consumers.get(tpi).initiateStop(); }); + List removedTasks = removedPartitions.stream() + .map(consumers::remove).toList(); // ctx.getQueueAdmin().createTopicIfNotExists(); addedPartitions.forEach((tpi) -> { @@ -444,6 +446,8 @@ public class TbRuleEngineQueueConsumerManager { consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); }); + + removedTasks.forEach(TbQueueConsumerTask::awaitCompletion); } @Override From 34cfd588b1981e923019d4d85bd68c24e1436708 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 13 May 2024 17:26:59 +0200 Subject: [PATCH 06/11] mvn license:format --- .../server/queue/kafka/TbKafkaAdminTest.java | 108 ++++++++++++++++-- 1 file changed, 98 insertions(+), 10 deletions(-) diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java index 429419363d..6a0670cd31 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java @@ -1,34 +1,45 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.queue.kafka; -import com.microsoft.aad.adal4j.AsymmetricKeyCredential; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Collection; import java.util.Comparator; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.jupiter.api.Assertions.*; @Slf4j class TbKafkaAdminTest { - Properties props; AdminClient admin; + @BeforeEach void setUp() { props = new Properties(); @@ -41,6 +52,7 @@ class TbKafkaAdminTest { admin.close(); } + @Disabled @Test void testListOffsets() throws ExecutionException, InterruptedException { log.info("Getting consumer groups list..."); @@ -53,8 +65,84 @@ class TbKafkaAdminTest { .partitionsToOffsetAndMetadata().get(); // Printing the fetched offsets - consumerOffsets.forEach((tp, om) ->log.info(tp.topic() + " partition " + tp.partition() + " offset " + om.offset())); + consumerOffsets.forEach((tp, om) -> log.info(tp.topic() + " partition " + tp.partition() + " offset " + om.offset())); + if (groupId.startsWith("re-") && groupId.endsWith("consumer")) { + log.info("****** Migrating groupId [{}] ...", groupId); + + for (var consumerOffset : consumerOffsets.entrySet()) { + var tp = consumerOffset.getKey(); + var om = consumerOffset.getValue(); + final Integer tbPartitionId = parsePartitionIdFromTopicName(tp.topic()); + if (tbPartitionId == null || tbPartitionId < 0) { + continue; + } + + String newGroupId = groupId + "-" + tbPartitionId; + log.info("Getting offsets for consumer groupId [{}]", newGroupId); + Map newConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId) + .partitionsToOffsetAndMetadata().get(); + + if (newConsumerOffsets.isEmpty()) { + log.info("Found existing new group ConsumerOffsets {}", newConsumerOffsets); + } + + var existingOffset = newConsumerOffsets.get(tp); + if (existingOffset == null) { + log.info("topic offset does not exists in the new node group, all found offsets"); + } else if (existingOffset.offset() >= om.offset()) { + log.info("topic offset {} >= than old node group offset {}", existingOffset.offset(), om.offset()); + continue; + } else { + log.info("SHOULD alter topic offset [{}] less than old node group offset [{}]", existingOffset.offset(), om.offset()); + } + + Map newOffsets = Map.of(tp, om); + + log.warn("@@@@@ alterConsumerGroupOffsets [{}] with new offsets [{}]", newGroupId, newOffsets); + admin.alterConsumerGroupOffsets(newGroupId, newOffsets).all().whenComplete((res, err) -> { + if (err != null) { + log.error("Failed to alterConsumerGroupOffsets for groupId [{}], new offsets [{}]", newGroupId, newOffsets, err); + } else { + log.info("Updated new consumer group [{}], offsets [{}]", newGroupId, newOffsets); + } + }).get(); // Handle asynchronously as appropriate + + //Verify + + Map resultedConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId) + .partitionsToOffsetAndMetadata().get(); + + MapDifference diff = Maps.difference(newOffsets, resultedConsumerOffsets); + + if (!diff.areEqual()) { + log.error("Verify failed for groupId [{}], current offset {} is not the same as expected {}", newGroupId, resultedConsumerOffsets, newOffsets); + } else { + log.info("Verify passed for groupId [{}]", newGroupId); + } + + } + + } } + } -} \ No newline at end of file + Integer parsePartitionIdFromTopicName(String topic) { + if (topic == null) { + return null; + } + int dotIndex = topic.lastIndexOf('.'); + if (dotIndex <= 0) { + return null; + } + + String indexStr = topic.substring(dotIndex + 1); + try { + return Integer.parseInt(indexStr); + } catch (Throwable t) { + log.warn("Can't parse partition Id from topic name [{}]", topic, t); + } + return null; + } + +} From 258c26ed0449601fe21b9179772317f79f62d197 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 15 May 2024 16:23:20 +0200 Subject: [PATCH 07/11] Kafka groupId syncOffsets from a fat group to a single-partition group --- .../server/queue/discovery/TopicService.java | 10 ++- .../server/queue/kafka/TbKafkaAdmin.java | 61 +++++++++++++------ .../provider/KafkaMonolithQueueFactory.java | 12 ++-- .../KafkaTbRuleEngineQueueFactory.java | 12 ++-- .../server/queue/kafka/TbKafkaAdminTest.java | 2 +- 5 files changed, 68 insertions(+), 29 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index b7e000197e..960be231c6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -65,7 +65,15 @@ public class TopicService { return prefix.isBlank() ? topic : prefix + "." + topic; } - public String suffix(Integer partitionId) { + public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) { + return this.buildTopicName( + servicePrefix + queueName + + (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId)) + + "-consumer" + + suffix(partitionId)); + } + + String suffix(Integer partitionId) { return partitionId == null ? "" : "-" + partitionId; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 65cc28405a..b818cd1234 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.kafka; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; @@ -40,6 +41,7 @@ import java.util.concurrent.TimeoutException; @Slf4j public class TbKafkaAdmin implements TbQueueAdmin { + @Getter private final AdminClient client; private final Map topicConfigs; private final Set topics = ConcurrentHashMap.newKeySet(); @@ -118,27 +120,52 @@ public class TbKafkaAdmin implements TbQueueAdmin { return client.createTopics(Collections.singletonList(topic)); } - public void syncOffsets(String oldGroupId, String newGroupId) throws ExecutionException, InterruptedException, TimeoutException { - ListConsumerGroupOffsetsResult fatOffsets = client.listConsumerGroupOffsets("id1"); - Map oldOffsets = new ConcurrentHashMap<>(); - client.listConsumerGroupOffsets(oldGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { - if (err != null) { - log.warn("Failed to list consumer group offsets [{}]", oldGroupId, err); - } else { - oldOffsets.putAll(res); + /** + * Sync offsets from a fat group to a single-partition group + * Migration back from single-partition consumer to a fat group is not supported + * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers + * */ + public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { + try { + syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId); + } catch (Exception e) { + log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); + } + } + + void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException { + log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); + if (partitionId == null) { + return; + } + Map oldOffsets = + client.listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + if (oldOffsets.isEmpty()) { + return; + } + + for (var consumerOffset : oldOffsets.entrySet()) { + var tp = consumerOffset.getKey(); + if (!tp.topic().endsWith("." + partitionId)) { + continue; } - }).get(10, TimeUnit.SECONDS); + var om = consumerOffset.getValue(); + Map newOffsets = + client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); - Map newOffsets = new ConcurrentHashMap<>(); - client.listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().whenComplete((res, err) -> { - if (err != null) { - log.warn("Failed to list consumer group offsets [{}]", newGroupId, err); + var existingOffset = newOffsets.get(tp); + if (existingOffset == null) { + log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets); + } else if (existingOffset.offset() >= om.offset()) { + log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset()); + break; } else { - newOffsets.putAll(res); + log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); } - }).get(10, TimeUnit.SECONDS); - - + client.alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS); + log.info("[{}] altered new consumer groupId {}", tp, newGroupId); + break; + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 73c62e33dc..30dac37851 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -75,7 +75,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; - private final TbQueueAdmin ruleEngineAdmin; + private final TbKafkaAdmin ruleEngineAdmin; private final TbQueueAdmin jsExecutorRequestAdmin; private final TbQueueAdmin jsExecutorResponseAdmin; private final TbQueueAdmin transportApiRequestAdmin; @@ -193,14 +193,16 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); + + ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName - + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) - + "-consumer" - + topicService.suffix(partitionId))); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 36238c206c..31fa7efffb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -68,7 +68,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueAdmin coreAdmin; - private final TbQueueAdmin ruleEngineAdmin; + private final TbKafkaAdmin ruleEngineAdmin; private final TbQueueAdmin jsExecutorRequestAdmin; private final TbQueueAdmin jsExecutorResponseAdmin; private final TbQueueAdmin notificationAdmin; @@ -170,14 +170,16 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @Override public TbQueueConsumer> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) { String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); + + ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic())); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName - + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) - + "-consumer" - + topicService.suffix(partitionId))); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java index 6a0670cd31..2007747dc9 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaAdminTest.java @@ -82,7 +82,7 @@ class TbKafkaAdminTest { Map newConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId) .partitionsToOffsetAndMetadata().get(); - if (newConsumerOffsets.isEmpty()) { + if (!newConsumerOffsets.isEmpty()) { log.info("Found existing new group ConsumerOffsets {}", newConsumerOffsets); } From 68225b438c047e4b7a1bb15740c42258ebef3f01 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 16 May 2024 18:27:28 +0200 Subject: [PATCH 08/11] TbQueueConsumerTask access the lazy consumer only by getter --- .../server/service/queue/ruleengine/TbQueueConsumerTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java index a25f74a5b1..e696192a43 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java @@ -63,12 +63,12 @@ public class TbQueueConsumerTask { public void subscribe(Set partitions) { log.trace("[{}] Subscribing to partitions: {}", key, partitions); - consumer.subscribe(partitions); + getConsumer().subscribe(partitions); } public void initiateStop() { log.debug("[{}] Initiating stop", key); - consumer.stop(); + getConsumer().stop(); } public void awaitCompletion() { From 5e3ff6fe20d24fbdb4d68d4e0bc452836a771997 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 22 May 2024 09:02:01 +0200 Subject: [PATCH 09/11] minor cleanup --- .../java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index b818cd1234..9c8f63594b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -116,7 +116,6 @@ public class TbKafkaAdmin implements TbQueueAdmin { } public CreateTopicsResult createTopic(NewTopic topic) { - client.listConsumerGroupOffsets("id1"); return client.createTopics(Collections.singletonList(topic)); } From 0f1ef6d0dc3ba641afc56099d48f7469e86b2888 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 22 May 2024 16:09:32 +0200 Subject: [PATCH 10/11] consumer creation refactored after merge --- .../queue/DefaultTbCoreConsumerService.java | 2 +- .../consumer/MainQueueConsumerManager.java | 13 +-- .../TbRuleEngineQueueConsumerManager.java | 83 ------------------- .../TbRuleEngineQueueConsumerManagerTest.java | 4 +- 4 files changed, 10 insertions(+), 92 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fbef0cfd29..ae8ea8f7f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -204,7 +204,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService queueFactory.createToCoreMsgConsumer()) + .consumerCreator((config, partitionId) -> queueFactory.createToCoreMsgConsumer()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) .taskExecutor(mgmtExecutor) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java index 6792422b57..6eb5c94c9b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/consumer/MainQueueConsumerManager.java @@ -41,7 +41,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; @Slf4j @@ -51,7 +51,7 @@ public class MainQueueConsumerManager msgPackProcessor; - protected final Function> consumerCreator; + protected final BiFunction> consumerCreator; protected final ExecutorService consumerExecutor; protected final ScheduledExecutorService scheduler; protected final ExecutorService taskExecutor; @@ -67,7 +67,7 @@ public class MainQueueConsumerManager msgPackProcessor, - Function> consumerCreator, + BiFunction> consumerCreator, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, ExecutorService taskExecutor) { @@ -273,8 +273,9 @@ public class MainQueueConsumerManager consumers.remove(tpi).awaitCompletion()); addedPartitions.forEach((tpi) -> { - String key = queueKey + "-" + tpi.getPartition().orElse(-1); - TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, consumerCreator.apply(config)); + Integer partitionId = tpi.getPartition().orElse(-1); + String key = queueKey + "-" + partitionId; + TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId)); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); @@ -303,7 +304,7 @@ public class MainQueueConsumerManager(queueKey, consumerCreator.apply(config)); + consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null)); // no partitionId passed } consumer.subscribe(partitions); if (!consumer.isRunning()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 8a96f2c0e1..c2823d3c00 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -249,87 +249,4 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager partitions) { - return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.joining(", ", "[", "]")); - } - - interface ConsumerWrapper { - - void updatePartitions(Set partitions); - - Collection getConsumers(); - - } - - class ConsumerPerPartitionWrapper implements ConsumerWrapper { - private final Map consumers = new HashMap<>(); - - @Override - public void updatePartitions(Set partitions) { - Set addedPartitions = new HashSet<>(partitions); - addedPartitions.removeAll(consumers.keySet()); - - Set removedPartitions = new HashSet<>(consumers.keySet()); - removedPartitions.removeAll(partitions); - log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions)); - - removedPartitions.forEach((tpi) -> { - consumers.get(tpi).initiateStop(); - }); - - List removedTasks = removedPartitions.stream() - .map(consumers::remove).toList(); - -// ctx.getQueueAdmin().createTopicIfNotExists(); - addedPartitions.forEach((tpi) -> { - int partitionId = tpi.getPartition().orElse(-999999); - String key = queueKey + "-" + partitionId; - TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, () -> ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, partitionId)); - consumers.put(tpi, consumer); - consumer.subscribe(Set.of(tpi)); - launchConsumer(consumer); - }); - - removedTasks.forEach(TbQueueConsumerTask::awaitCompletion); - } - - @Override - public Collection getConsumers() { - return consumers.values(); - } - } - - class SingleConsumerWrapper implements ConsumerWrapper { - private TbQueueConsumerTask consumer; - - @Override - public void updatePartitions(Set partitions) { - log.info("[{}] New partitions: {}", queueKey, partitionsToString(partitions)); - if (partitions.isEmpty()) { - if (consumer != null && consumer.isRunning()) { - consumer.initiateStop(); - consumer.awaitCompletion(); - } - consumer = null; - return; - } - - if (consumer == null) { - consumer = new TbQueueConsumerTask(queueKey, () -> ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue, null)); - } - consumer.subscribe(partitions); - if (!consumer.isRunning()) { - launchConsumer(consumer); - } - } - - @Override - public Collection getConsumers() { - if (consumer == null) { - return Collections.emptyList(); - } - return List.of(consumer); - } - } - } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 35b5eaeb51..fdf2e203ee 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -21,10 +21,10 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; From fdf28010dc2053b447d04f15aad73a079678bf78 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 22 May 2024 16:20:07 +0200 Subject: [PATCH 11/11] removed unused import --- .../server/service/queue/ruleengine/TbQueueConsumerTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java index 63be2d89da..5e672eb5c6 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerTask.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.queue.ruleengine; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;