Merge pull request #10728 from smatvienko-tb/feature/kafka-consumer-group-per-partition
Rule engine: Kafka consumer group per partition
This commit is contained in:
		
						commit
						a8dee2643b
					
				@ -204,7 +204,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
 | 
			
		||||
                .queueKey(new QueueKey(ServiceType.TB_CORE))
 | 
			
		||||
                .config(CoreQueueConfig.of(consumerPerPartition, (int) pollInterval))
 | 
			
		||||
                .msgPackProcessor(this::processMsgs)
 | 
			
		||||
                .consumerCreator(config -> queueFactory.createToCoreMsgConsumer())
 | 
			
		||||
                .consumerCreator((config, partitionId) -> queueFactory.createToCoreMsgConsumer())
 | 
			
		||||
                .consumerExecutor(consumersExecutor)
 | 
			
		||||
                .scheduler(scheduler)
 | 
			
		||||
                .taskExecutor(mgmtExecutor)
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,7 @@ import java.util.concurrent.Future;
 | 
			
		||||
import java.util.concurrent.ScheduledExecutorService;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
import java.util.function.BiFunction;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@ -51,7 +51,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
 | 
			
		||||
    @Getter
 | 
			
		||||
    protected C config;
 | 
			
		||||
    protected final MsgPackProcessor<M, C> msgPackProcessor;
 | 
			
		||||
    protected final Function<C, TbQueueConsumer<M>> consumerCreator;
 | 
			
		||||
    protected final BiFunction<C, Integer, TbQueueConsumer<M>> consumerCreator;
 | 
			
		||||
    protected final ExecutorService consumerExecutor;
 | 
			
		||||
    protected final ScheduledExecutorService scheduler;
 | 
			
		||||
    protected final ExecutorService taskExecutor;
 | 
			
		||||
@ -67,7 +67,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
 | 
			
		||||
    @Builder
 | 
			
		||||
    public MainQueueConsumerManager(QueueKey queueKey, C config,
 | 
			
		||||
                                    MsgPackProcessor<M, C> msgPackProcessor,
 | 
			
		||||
                                    Function<C, TbQueueConsumer<M>> consumerCreator,
 | 
			
		||||
                                    BiFunction<C, Integer, TbQueueConsumer<M>> consumerCreator,
 | 
			
		||||
                                    ExecutorService consumerExecutor,
 | 
			
		||||
                                    ScheduledExecutorService scheduler,
 | 
			
		||||
                                    ExecutorService taskExecutor) {
 | 
			
		||||
@ -273,8 +273,9 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
 | 
			
		||||
            removedPartitions.forEach((tpi) -> consumers.remove(tpi).awaitCompletion());
 | 
			
		||||
 | 
			
		||||
            addedPartitions.forEach((tpi) -> {
 | 
			
		||||
                String key = queueKey + "-" + tpi.getPartition().orElse(-1);
 | 
			
		||||
                TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, consumerCreator.apply(config));
 | 
			
		||||
                Integer partitionId = tpi.getPartition().orElse(-1);
 | 
			
		||||
                String key = queueKey + "-" + partitionId;
 | 
			
		||||
                TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId));
 | 
			
		||||
                consumers.put(tpi, consumer);
 | 
			
		||||
                consumer.subscribe(Set.of(tpi));
 | 
			
		||||
                launchConsumer(consumer);
 | 
			
		||||
@ -303,7 +304,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (consumer == null) {
 | 
			
		||||
                consumer = new TbQueueConsumerTask<>(queueKey, consumerCreator.apply(config));
 | 
			
		||||
                consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null)); // no partitionId passed
 | 
			
		||||
            }
 | 
			
		||||
            consumer.subscribe(partitions);
 | 
			
		||||
            if (!consumer.isRunning()) {
 | 
			
		||||
 | 
			
		||||
@ -16,37 +16,57 @@
 | 
			
		||||
package org.thingsboard.server.service.queue.ruleengine;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.Setter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.Supplier;
 | 
			
		||||
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TbQueueConsumerTask<M extends TbQueueMsg> {
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final Object key;
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final TbQueueConsumer<M> consumer;
 | 
			
		||||
    private volatile TbQueueConsumer<M> consumer;
 | 
			
		||||
    private volatile Supplier<TbQueueConsumer<M>> consumerSupplier;
 | 
			
		||||
 | 
			
		||||
    @Setter
 | 
			
		||||
    private Future<?> task;
 | 
			
		||||
 | 
			
		||||
    public TbQueueConsumerTask(Object key, Supplier<TbQueueConsumer<M>> consumerSupplier) {
 | 
			
		||||
        this.key = key;
 | 
			
		||||
        this.consumer = null;
 | 
			
		||||
        this.consumerSupplier = consumerSupplier;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbQueueConsumer<M> getConsumer() {
 | 
			
		||||
        if (consumer == null) {
 | 
			
		||||
            synchronized (this) {
 | 
			
		||||
                if (consumer == null) {
 | 
			
		||||
                    Objects.requireNonNull(consumerSupplier, "consumerSupplier for key [" + key + "] is null");
 | 
			
		||||
                    consumer = consumerSupplier.get();
 | 
			
		||||
                    Objects.requireNonNull(consumer, "consumer for key [" + key + "] is null");
 | 
			
		||||
                    consumerSupplier = null;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return consumer;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void subscribe(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        log.trace("[{}] Subscribing to partitions: {}", key, partitions);
 | 
			
		||||
        consumer.subscribe(partitions);
 | 
			
		||||
        getConsumer().subscribe(partitions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void initiateStop() {
 | 
			
		||||
        log.debug("[{}] Initiating stop", key);
 | 
			
		||||
        consumer.stop();
 | 
			
		||||
        getConsumer().stop();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void awaitCompletion() {
 | 
			
		||||
 | 
			
		||||
@ -21,10 +21,10 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.junit.jupiter.api.AfterEach;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
			
		||||
import org.junit.jupiter.params.ParameterizedTest;
 | 
			
		||||
import org.junit.jupiter.params.provider.ValueSource;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoExtension;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoSettings;
 | 
			
		||||
import org.mockito.quality.Strictness;
 | 
			
		||||
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;
 | 
			
		||||
@ -55,6 +55,8 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.QueueKey;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.provider.KafkaMonolithQueueFactory;
 | 
			
		||||
import org.thingsboard.server.queue.provider.KafkaTbRuleEngineQueueFactory;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
 | 
			
		||||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
 | 
			
		||||
@ -79,10 +81,12 @@ import java.util.stream.Collectors;
 | 
			
		||||
import java.util.stream.IntStream;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
 | 
			
		||||
import static org.awaitility.Awaitility.await;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.argThat;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.BDDMockito.willCallRealMethod;
 | 
			
		||||
import static org.mockito.Mockito.after;
 | 
			
		||||
import static org.mockito.Mockito.atLeast;
 | 
			
		||||
import static org.mockito.Mockito.atLeastOnce;
 | 
			
		||||
@ -114,6 +118,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
 | 
			
		||||
    private PartitionService partitionService;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbQueueProducerProvider producerProvider;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbQueueAdmin queueAdmin;
 | 
			
		||||
@ -148,7 +153,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
 | 
			
		||||
            log.trace("totalProcessedMsgs = {}", totalProcessedMsgs);
 | 
			
		||||
            return null;
 | 
			
		||||
        }).when(actorContext).tell(any());
 | 
			
		||||
        ruleEngineMsgProducer = mock(TbQueueProducer.class);
 | 
			
		||||
 | 
			
		||||
        when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer);
 | 
			
		||||
        consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
 | 
			
		||||
        mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(3, "tb-rule-engine-mgmt");
 | 
			
		||||
@ -180,7 +185,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
 | 
			
		||||
            }
 | 
			
		||||
            consumers.add(consumer);
 | 
			
		||||
            return consumer;
 | 
			
		||||
        }).when(queueFactory).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
        }).when(queueFactory).createToRuleEngineMsgConsumer(any(), any());
 | 
			
		||||
 | 
			
		||||
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
 | 
			
		||||
        consumerManager = TbRuleEngineQueueConsumerManager.create()
 | 
			
		||||
@ -210,6 +215,20 @@ public class TbRuleEngineQueueConsumerManagerTest {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @ValueSource(classes = {KafkaMonolithQueueFactory.class, KafkaTbRuleEngineQueueFactory.class})
 | 
			
		||||
    public void testUnsupported_createToRuleEngineMsgConsumer_KafkaTbRuleEngineQueueFactory(Class<TbRuleEngineQueueFactory> factoryClass) {
 | 
			
		||||
        // obsolete, but need to pass the afterEach
 | 
			
		||||
        queue.setConsumerPerPartition(false);
 | 
			
		||||
        consumerManager.init(queue);
 | 
			
		||||
 | 
			
		||||
        var factory = mock(factoryClass);
 | 
			
		||||
        willCallRealMethod().given(factory).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
        assertThatThrownBy(() -> factory.createToRuleEngineMsgConsumer(mock(Queue.class)))
 | 
			
		||||
                .isInstanceOf(UnsupportedOperationException.class);
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testInit_consumerPerPartition() {
 | 
			
		||||
        queue.setConsumerPerPartition(true);
 | 
			
		||||
@ -247,7 +266,8 @@ public class TbRuleEngineQueueConsumerManagerTest {
 | 
			
		||||
 | 
			
		||||
        Set<TopicPartitionInfo> partitions = Collections.emptySet();
 | 
			
		||||
        consumerManager.update(partitions);
 | 
			
		||||
        verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
        verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any());
 | 
			
		||||
        verify(queueFactory, never()).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
 | 
			
		||||
        partitions = createTpis(1);
 | 
			
		||||
        consumerManager.update(partitions);
 | 
			
		||||
@ -278,7 +298,8 @@ public class TbRuleEngineQueueConsumerManagerTest {
 | 
			
		||||
        consumerManager.init(queue);
 | 
			
		||||
 | 
			
		||||
        consumerManager.update(Collections.emptySet());
 | 
			
		||||
        verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
        verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any());
 | 
			
		||||
        verify(queueFactory, never()).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
 | 
			
		||||
        consumerManager.update(createTpis(1));
 | 
			
		||||
        TestConsumer consumer1 = getConsumer(1);
 | 
			
		||||
@ -420,7 +441,8 @@ public class TbRuleEngineQueueConsumerManagerTest {
 | 
			
		||||
        consumerManager.update(createTpis(1));
 | 
			
		||||
        TestConsumer consumer = getConsumer(1);
 | 
			
		||||
        verifySubscribedAndLaunched(consumer, 1);
 | 
			
		||||
        verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
        verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any(), any());
 | 
			
		||||
        verify(queueFactory, never()).createToRuleEngineMsgConsumer(any());
 | 
			
		||||
 | 
			
		||||
        consumerManager.stop();
 | 
			
		||||
        consumerManager.update(createTpis(1, 2, 3, 4)); // to check that no new tasks after stop are processed
 | 
			
		||||
 | 
			
		||||
@ -64,4 +64,17 @@ public class TopicService {
 | 
			
		||||
    public String buildTopicName(String topic) {
 | 
			
		||||
        return prefix.isBlank() ? topic : prefix + "." + topic;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
    public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) {
 | 
			
		||||
        return this.buildTopicName(
 | 
			
		||||
                servicePrefix + queueName
 | 
			
		||||
                + (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId))
 | 
			
		||||
                + "-consumer"
 | 
			
		||||
                + suffix(partitionId));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    String suffix(Integer partitionId) {
 | 
			
		||||
        return partitionId == null ? "" : "-" + partitionId;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,9 +15,13 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.kafka;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.kafka.clients.admin.CreateTopicsResult;
 | 
			
		||||
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 | 
			
		||||
import org.apache.kafka.clients.admin.NewTopic;
 | 
			
		||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 | 
			
		||||
import org.apache.kafka.common.TopicPartition;
 | 
			
		||||
import org.apache.kafka.common.errors.TopicExistsException;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
@ -27,6 +31,8 @@ import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.TimeoutException;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 24.09.18.
 | 
			
		||||
@ -120,4 +126,53 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Sync offsets from a fat group to a single-partition group
 | 
			
		||||
     * Migration back from single-partition consumer to a fat group is not supported
 | 
			
		||||
     * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers
 | 
			
		||||
     * */
 | 
			
		||||
    public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
 | 
			
		||||
        try {
 | 
			
		||||
            syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException {
 | 
			
		||||
        log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
 | 
			
		||||
        if (partitionId == null) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        Map<TopicPartition, OffsetAndMetadata> oldOffsets =
 | 
			
		||||
                settings.getAdminClient().listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
 | 
			
		||||
        if (oldOffsets.isEmpty()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (var consumerOffset : oldOffsets.entrySet()) {
 | 
			
		||||
            var tp = consumerOffset.getKey();
 | 
			
		||||
            if (!tp.topic().endsWith("." + partitionId)) {
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
            var om = consumerOffset.getValue();
 | 
			
		||||
            Map<TopicPartition, OffsetAndMetadata> newOffsets =
 | 
			
		||||
                    settings.getAdminClient().listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
 | 
			
		||||
 | 
			
		||||
            var existingOffset = newOffsets.get(tp);
 | 
			
		||||
            if (existingOffset == null) {
 | 
			
		||||
                log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets);
 | 
			
		||||
            } else if (existingOffset.offset() >= om.offset()) {
 | 
			
		||||
                log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset());
 | 
			
		||||
                break;
 | 
			
		||||
            } else {
 | 
			
		||||
                log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
 | 
			
		||||
            }
 | 
			
		||||
            settings.getAdminClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS);
 | 
			
		||||
            log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -75,7 +75,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
			
		||||
    private final TbKafkaConsumerStatsService consumerStatsService;
 | 
			
		||||
 | 
			
		||||
    private final TbQueueAdmin coreAdmin;
 | 
			
		||||
    private final TbQueueAdmin ruleEngineAdmin;
 | 
			
		||||
    private final TbKafkaAdmin ruleEngineAdmin;
 | 
			
		||||
    private final TbQueueAdmin jsExecutorRequestAdmin;
 | 
			
		||||
    private final TbQueueAdmin jsExecutorResponseAdmin;
 | 
			
		||||
    private final TbQueueAdmin transportApiRequestAdmin;
 | 
			
		||||
@ -187,18 +187,29 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
 | 
			
		||||
        throw new UnsupportedOperationException("Rule engine consumer should use a partitionId");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) {
 | 
			
		||||
        String queueName = configuration.getName();
 | 
			
		||||
        String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
 | 
			
		||||
 | 
			
		||||
        ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
 | 
			
		||||
                groupId, partitionId);
 | 
			
		||||
 | 
			
		||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
 | 
			
		||||
        consumerBuilder.settings(kafkaSettings);
 | 
			
		||||
        consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic()));
 | 
			
		||||
        consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
 | 
			
		||||
        consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + "-consumer"));
 | 
			
		||||
        consumerBuilder.groupId(groupId);
 | 
			
		||||
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
        consumerBuilder.admin(ruleEngineAdmin);
 | 
			
		||||
        consumerBuilder.statsService(consumerStatsService);
 | 
			
		||||
        return consumerBuilder.build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
 | 
			
		||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
 | 
			
		||||
 | 
			
		||||
@ -68,7 +68,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
			
		||||
    private final TbQueueTransportNotificationSettings transportNotificationSettings;
 | 
			
		||||
 | 
			
		||||
    private final TbQueueAdmin coreAdmin;
 | 
			
		||||
    private final TbQueueAdmin ruleEngineAdmin;
 | 
			
		||||
    private final TbKafkaAdmin ruleEngineAdmin;
 | 
			
		||||
    private final TbQueueAdmin jsExecutorRequestAdmin;
 | 
			
		||||
    private final TbQueueAdmin jsExecutorResponseAdmin;
 | 
			
		||||
    private final TbQueueAdmin notificationAdmin;
 | 
			
		||||
@ -164,12 +164,22 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
 | 
			
		||||
        throw new UnsupportedOperationException("Rule engine consumer should use a partitionId");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) {
 | 
			
		||||
        String queueName = configuration.getName();
 | 
			
		||||
        String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
 | 
			
		||||
 | 
			
		||||
        ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
 | 
			
		||||
                groupId, partitionId);
 | 
			
		||||
 | 
			
		||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
 | 
			
		||||
        consumerBuilder.settings(kafkaSettings);
 | 
			
		||||
        consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic()));
 | 
			
		||||
        consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
 | 
			
		||||
        consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + "-consumer"));
 | 
			
		||||
        consumerBuilder.groupId(groupId);
 | 
			
		||||
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
        consumerBuilder.admin(ruleEngineAdmin);
 | 
			
		||||
        consumerBuilder.statsService(consumerStatsService);
 | 
			
		||||
 | 
			
		||||
@ -77,13 +77,25 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
 | 
			
		||||
    TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer();
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Used to consume messages by TB Core Service
 | 
			
		||||
     * Used to consume messages by TB Rule Engine Service
 | 
			
		||||
     *
 | 
			
		||||
     * @return
 | 
			
		||||
     * @param configuration
 | 
			
		||||
     */
 | 
			
		||||
    TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Used to consume messages by TB Rule Engine Service
 | 
			
		||||
     * Intended usage for consumer per partition strategy
 | 
			
		||||
     *
 | 
			
		||||
     * @return TbQueueConsumer
 | 
			
		||||
     * @param configuration
 | 
			
		||||
     * @param partitionId as a suffix for consumer name
 | 
			
		||||
     */
 | 
			
		||||
    default TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) {
 | 
			
		||||
        return createToRuleEngineMsgConsumer(configuration);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Used to consume high priority messages by TB Core Service
 | 
			
		||||
     *
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,148 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.kafka;
 | 
			
		||||
 | 
			
		||||
import com.google.common.collect.MapDifference;
 | 
			
		||||
import com.google.common.collect.Maps;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.kafka.clients.admin.AdminClient;
 | 
			
		||||
import org.apache.kafka.clients.admin.AdminClientConfig;
 | 
			
		||||
import org.apache.kafka.clients.admin.ConsumerGroupListing;
 | 
			
		||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 | 
			
		||||
import org.apache.kafka.common.TopicPartition;
 | 
			
		||||
import org.junit.jupiter.api.AfterEach;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Disabled;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.Comparator;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Properties;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
class TbKafkaAdminTest {
 | 
			
		||||
 | 
			
		||||
    Properties props;
 | 
			
		||||
    AdminClient admin;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void setUp() {
 | 
			
		||||
        props = new Properties();
 | 
			
		||||
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 | 
			
		||||
        admin = AdminClient.create(props);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @AfterEach
 | 
			
		||||
    void tearDown() {
 | 
			
		||||
        admin.close();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Disabled
 | 
			
		||||
    @Test
 | 
			
		||||
    void testListOffsets() throws ExecutionException, InterruptedException {
 | 
			
		||||
        log.info("Getting consumer groups list...");
 | 
			
		||||
        Collection<ConsumerGroupListing> consumerGroupListings = admin.listConsumerGroups().all().get();
 | 
			
		||||
        consumerGroupListings = consumerGroupListings.stream().sorted(Comparator.comparing(ConsumerGroupListing::groupId)).toList();
 | 
			
		||||
        for (ConsumerGroupListing consumerGroup : consumerGroupListings) {
 | 
			
		||||
            String groupId = consumerGroup.groupId();
 | 
			
		||||
            log.info("=== consumer group: {}", groupId);
 | 
			
		||||
            Map<TopicPartition, OffsetAndMetadata> consumerOffsets = admin.listConsumerGroupOffsets(groupId)
 | 
			
		||||
                    .partitionsToOffsetAndMetadata().get();
 | 
			
		||||
 | 
			
		||||
            // Printing the fetched offsets
 | 
			
		||||
            consumerOffsets.forEach((tp, om) -> log.info(tp.topic() + " partition " + tp.partition() + " offset " + om.offset()));
 | 
			
		||||
            if (groupId.startsWith("re-") && groupId.endsWith("consumer")) {
 | 
			
		||||
                log.info("****** Migrating groupId [{}] ...", groupId);
 | 
			
		||||
 | 
			
		||||
                for (var consumerOffset : consumerOffsets.entrySet()) {
 | 
			
		||||
                    var tp = consumerOffset.getKey();
 | 
			
		||||
                    var om = consumerOffset.getValue();
 | 
			
		||||
                    final Integer tbPartitionId = parsePartitionIdFromTopicName(tp.topic());
 | 
			
		||||
                    if (tbPartitionId == null || tbPartitionId < 0) {
 | 
			
		||||
                        continue;
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    String newGroupId = groupId + "-" + tbPartitionId;
 | 
			
		||||
                    log.info("Getting offsets for consumer groupId [{}]", newGroupId);
 | 
			
		||||
                    Map<TopicPartition, OffsetAndMetadata> newConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId)
 | 
			
		||||
                            .partitionsToOffsetAndMetadata().get();
 | 
			
		||||
 | 
			
		||||
                    if (!newConsumerOffsets.isEmpty()) {
 | 
			
		||||
                        log.info("Found existing new group ConsumerOffsets {}", newConsumerOffsets);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    var existingOffset = newConsumerOffsets.get(tp);
 | 
			
		||||
                    if (existingOffset == null) {
 | 
			
		||||
                        log.info("topic offset does not exists in the new node group, all found offsets");
 | 
			
		||||
                    } else if (existingOffset.offset() >= om.offset()) {
 | 
			
		||||
                        log.info("topic offset {} >= than old node group offset {}", existingOffset.offset(), om.offset());
 | 
			
		||||
                        continue;
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.info("SHOULD alter topic offset [{}] less than old node group offset [{}]", existingOffset.offset(), om.offset());
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    Map<TopicPartition, OffsetAndMetadata> newOffsets = Map.of(tp, om);
 | 
			
		||||
 | 
			
		||||
                    log.warn("@@@@@ alterConsumerGroupOffsets [{}] with new offsets [{}]", newGroupId, newOffsets);
 | 
			
		||||
                    admin.alterConsumerGroupOffsets(newGroupId, newOffsets).all().whenComplete((res, err) -> {
 | 
			
		||||
                        if (err != null) {
 | 
			
		||||
                            log.error("Failed to alterConsumerGroupOffsets for groupId [{}], new offsets [{}]", newGroupId, newOffsets, err);
 | 
			
		||||
                        } else {
 | 
			
		||||
                            log.info("Updated new consumer group [{}], offsets [{}]", newGroupId, newOffsets);
 | 
			
		||||
                        }
 | 
			
		||||
                    }).get();  // Handle asynchronously as appropriate
 | 
			
		||||
 | 
			
		||||
                    //Verify
 | 
			
		||||
 | 
			
		||||
                    Map<TopicPartition, OffsetAndMetadata> resultedConsumerOffsets = admin.listConsumerGroupOffsets(newGroupId)
 | 
			
		||||
                            .partitionsToOffsetAndMetadata().get();
 | 
			
		||||
 | 
			
		||||
                    MapDifference<TopicPartition, OffsetAndMetadata> diff = Maps.difference(newOffsets, resultedConsumerOffsets);
 | 
			
		||||
 | 
			
		||||
                    if (!diff.areEqual()) {
 | 
			
		||||
                        log.error("Verify failed for groupId [{}], current offset {} is not the same as expected {}", newGroupId, resultedConsumerOffsets, newOffsets);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.info("Verify passed for groupId [{}]", newGroupId);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Integer parsePartitionIdFromTopicName(String topic) {
 | 
			
		||||
        if (topic == null) {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
        int dotIndex = topic.lastIndexOf('.');
 | 
			
		||||
        if (dotIndex <= 0) {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        String indexStr = topic.substring(dotIndex + 1);
 | 
			
		||||
        try {
 | 
			
		||||
            return Integer.parseInt(indexStr);
 | 
			
		||||
        } catch (Throwable t) {
 | 
			
		||||
            log.warn("Can't parse partition Id from topic name [{}]", topic, t);
 | 
			
		||||
        }
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user