EDQS: don't use Kafka manual partitions assignment
This commit is contained in:
		
							parent
							
								
									ef0a0a2d50
								
							
						
					
					
						commit
						9ddd22edcf
					
				@ -43,6 +43,7 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
 | 
			
		||||
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
 | 
			
		||||
import org.thingsboard.server.dao.sql.relation.RelationRepository;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsConfig;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
@ -75,6 +76,8 @@ public abstract class EdqsSyncService {
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Lazy
 | 
			
		||||
    private DefaultEdqsService edqsService;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected EdqsConfig edqsConfig;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentHashMap<UUID, EntityIdInfo> entityInfoMap = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentHashMap<Integer, String> keys = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
@ -17,11 +17,14 @@ package org.thingsboard.server.service.edqs;
 | 
			
		||||
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsQueue;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
import java.util.stream.IntStream;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'")
 | 
			
		||||
@ -31,7 +34,12 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
 | 
			
		||||
 | 
			
		||||
    public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) {
 | 
			
		||||
        TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
 | 
			
		||||
        this.syncNeeded = kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic());
 | 
			
		||||
        this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
 | 
			
		||||
                .mapToObj(partition -> TopicPartitionInfo.builder()
 | 
			
		||||
                        .topic(EdqsQueue.EVENTS.getTopic())
 | 
			
		||||
                        .partition(partition)
 | 
			
		||||
                        .build().getFullTopicName())
 | 
			
		||||
                .collect(Collectors.toSet()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -1645,12 +1645,12 @@ queue:
 | 
			
		||||
      calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for Calculated Field State topics
 | 
			
		||||
      calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
      # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
      # Kafka properties for EDQS events topics
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention)
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction)
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
    consumer-stats:
 | 
			
		||||
      # Prints lag between consumer group offset and last messages offset in Kafka topics
 | 
			
		||||
      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
			
		||||
 | 
			
		||||
@ -164,13 +164,10 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            Set<TopicPartitionInfo> newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS));
 | 
			
		||||
            Set<TopicPartitionInfo> partitions = newPartitions.stream()
 | 
			
		||||
                    .map(tpi -> tpi.withUseInternalPartition(true))
 | 
			
		||||
                    .collect(Collectors.toSet());
 | 
			
		||||
 | 
			
		||||
            stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic()));
 | 
			
		||||
            stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic()));
 | 
			
		||||
            // eventsConsumer's partitions are updated by stateService
 | 
			
		||||
            responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); // FIXME: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
 | 
			
		||||
            responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
 | 
			
		||||
 | 
			
		||||
            Set<TopicPartitionInfo> oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS));
 | 
			
		||||
            if (CollectionsUtil.isNotEmpty(oldPartitions)) {
 | 
			
		||||
 | 
			
		||||
@ -70,17 +70,13 @@ public class EdqsProducer {
 | 
			
		||||
                log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                .topic(topic)
 | 
			
		||||
                .partition(partitionService.resolvePartition(tenantId))
 | 
			
		||||
                .build();
 | 
			
		||||
        if (producer instanceof TbKafkaProducerTemplate<TbProtoQueueMsg<ToEdqsMsg>> kafkaProducer) {
 | 
			
		||||
            TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                    .topic(topic)
 | 
			
		||||
                    .partition(partitionService.resolvePartition(tenantId))
 | 
			
		||||
                    .useInternalPartition(true)
 | 
			
		||||
                    .build();
 | 
			
		||||
            kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction
 | 
			
		||||
        } else {
 | 
			
		||||
            TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                    .topic(topic)
 | 
			
		||||
                    .build();
 | 
			
		||||
            producer.send(tpi, new TbProtoQueueMsg<>(null, msg), callback);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -45,6 +45,8 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
import java.util.stream.IntStream;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@ -151,7 +153,13 @@ public class KafkaEdqsStateService implements EdqsStateService {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        if (queueStateService.getPartitions().isEmpty()) {
 | 
			
		||||
            eventsToBackupConsumer.subscribe();
 | 
			
		||||
            Set<TopicPartitionInfo> allPartitions = IntStream.range(0, config.getPartitions())
 | 
			
		||||
                    .mapToObj(partition -> TopicPartitionInfo.builder()
 | 
			
		||||
                            .topic(EdqsQueue.EVENTS.getTopic())
 | 
			
		||||
                            .partition(partition)
 | 
			
		||||
                            .build())
 | 
			
		||||
                    .collect(Collectors.toSet());
 | 
			
		||||
            eventsToBackupConsumer.subscribe(allPartitions);
 | 
			
		||||
            eventsToBackupConsumer.launch();
 | 
			
		||||
        }
 | 
			
		||||
        queueStateService.update(new QueueKey(ServiceType.EDQS), partitions);
 | 
			
		||||
 | 
			
		||||
@ -263,7 +263,6 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
        TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                .topic(requestTemplate.getDefaultTopic())
 | 
			
		||||
                .partition(partition)
 | 
			
		||||
                .useInternalPartition(partition != null)
 | 
			
		||||
                .build();
 | 
			
		||||
        requestTemplate.send(tpi, request, new TbQueueCallback() {
 | 
			
		||||
            @Override
 | 
			
		||||
 | 
			
		||||
@ -188,31 +188,46 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean isTopicEmpty(String topic) {
 | 
			
		||||
        return areAllTopicsEmpty(Set.of(topic));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean areAllTopicsEmpty(Set<String> topics) {
 | 
			
		||||
        try {
 | 
			
		||||
            if (!getTopics().contains(topic)) {
 | 
			
		||||
            List<String> existingTopics = getTopics().stream().filter(topics::contains).toList();
 | 
			
		||||
            if (existingTopics.isEmpty()) {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
            TopicDescription topicDescription = settings.getAdminClient().describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic).get();
 | 
			
		||||
            List<TopicPartition> partitions = topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).toList();
 | 
			
		||||
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = settings.getAdminClient().listOffsets(partitions.stream()
 | 
			
		||||
            List<TopicPartition> allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream()
 | 
			
		||||
                    .flatMap(entry -> {
 | 
			
		||||
                        String topic = entry.getKey();
 | 
			
		||||
                        TopicDescription topicDescription;
 | 
			
		||||
                        try {
 | 
			
		||||
                            topicDescription = entry.getValue().get();
 | 
			
		||||
                        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
                            throw new RuntimeException(e);
 | 
			
		||||
                        }
 | 
			
		||||
                        return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
 | 
			
		||||
                    })
 | 
			
		||||
                    .toList();
 | 
			
		||||
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
 | 
			
		||||
                    .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get();
 | 
			
		||||
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = settings.getAdminClient().listOffsets(partitions.stream()
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
 | 
			
		||||
                    .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get();
 | 
			
		||||
 | 
			
		||||
            for (TopicPartition partition : partitions) {
 | 
			
		||||
            for (TopicPartition partition : allPartitions) {
 | 
			
		||||
                long beginningOffset = beginningOffsets.get(partition).offset();
 | 
			
		||||
                long endOffset = endOffsets.get(partition).offset();
 | 
			
		||||
 | 
			
		||||
                if (beginningOffset != endOffset) {
 | 
			
		||||
                    log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), topic);
 | 
			
		||||
                    log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic());
 | 
			
		||||
                    return false;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            return true;
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            log.error("Failed to check if topic [{}] is empty.", topic, e);
 | 
			
		||||
            log.error("Failed to check if topics [{}] empty.", topics, e);
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -148,12 +148,12 @@ queue:
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
      # Kafka properties for EDQS events topics
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention)
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction)
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
    consumer-stats:
 | 
			
		||||
      # Prints lag between consumer group offset and last messages offset in Kafka topics
 | 
			
		||||
      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user