commit
						cb8b7bbbeb
					
				@ -76,8 +76,9 @@ public class DefaultEdqsApiService implements EdqsApiService {
 | 
			
		||||
            requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Integer partition = edqsPartitionService.resolvePartition(tenantId);
 | 
			
		||||
        ListenableFuture<TbProtoQueueMsg<FromEdqsMsg>> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition);
 | 
			
		||||
        UUID key = UUID.randomUUID();
 | 
			
		||||
        Integer partition = edqsPartitionService.resolvePartition(tenantId, key);
 | 
			
		||||
        ListenableFuture<TbProtoQueueMsg<FromEdqsMsg>> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(key, requestMsg.build()), partition);
 | 
			
		||||
        return Futures.transform(resultFuture, msg -> {
 | 
			
		||||
            TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg();
 | 
			
		||||
            return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class);
 | 
			
		||||
 | 
			
		||||
@ -96,10 +96,8 @@ public class DefaultEdqsService implements EdqsService {
 | 
			
		||||
    private void init() {
 | 
			
		||||
        executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass());
 | 
			
		||||
        eventsProducer = EdqsProducer.builder()
 | 
			
		||||
                .queue(EdqsQueue.EVENTS)
 | 
			
		||||
                .partitionService(edqsPartitionService)
 | 
			
		||||
                .topicService(topicService)
 | 
			
		||||
                .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS))
 | 
			
		||||
                .partitionService(edqsPartitionService)
 | 
			
		||||
                .build();
 | 
			
		||||
        syncLock = distributedLockService.getLock("edqs_sync");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -17,11 +17,15 @@ package org.thingsboard.server.service.edqs;
 | 
			
		||||
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsConfig;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsQueue;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
import java.util.stream.IntStream;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'")
 | 
			
		||||
@ -29,9 +33,14 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
 | 
			
		||||
 | 
			
		||||
    private final boolean syncNeeded;
 | 
			
		||||
 | 
			
		||||
    public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) {
 | 
			
		||||
    public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, EdqsConfig edqsConfig) {
 | 
			
		||||
        TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
 | 
			
		||||
        this.syncNeeded = kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic());
 | 
			
		||||
        this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
 | 
			
		||||
                .mapToObj(partition -> TopicPartitionInfo.builder()
 | 
			
		||||
                        .topic(EdqsQueue.EVENTS.getTopic())
 | 
			
		||||
                        .partition(partition)
 | 
			
		||||
                        .build().getFullTopicName())
 | 
			
		||||
                .collect(Collectors.toSet()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -1645,12 +1645,12 @@ queue:
 | 
			
		||||
      calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for Calculated Field State topics
 | 
			
		||||
      calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
      # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
      # Kafka properties for EDQS events topics
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention)
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction)
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
    consumer-stats:
 | 
			
		||||
      # Prints lag between consumer group offset and last messages offset in Kafka topics
 | 
			
		||||
      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
			
		||||
 | 
			
		||||
@ -164,25 +164,27 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            Set<TopicPartitionInfo> newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS));
 | 
			
		||||
            Set<TopicPartitionInfo> partitions = newPartitions.stream()
 | 
			
		||||
                    .map(tpi -> tpi.withUseInternalPartition(true))
 | 
			
		||||
                    .collect(Collectors.toSet());
 | 
			
		||||
 | 
			
		||||
            stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic()));
 | 
			
		||||
            stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic()));
 | 
			
		||||
            // eventsConsumer's partitions are updated by stateService
 | 
			
		||||
            responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); // FIXME: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
 | 
			
		||||
            responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
 | 
			
		||||
 | 
			
		||||
            Set<TopicPartitionInfo> oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS));
 | 
			
		||||
            if (CollectionsUtil.isNotEmpty(oldPartitions)) {
 | 
			
		||||
                Set<Integer> removedPartitions = Sets.difference(oldPartitions, newPartitions).stream()
 | 
			
		||||
                        .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet());
 | 
			
		||||
                if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) {
 | 
			
		||||
                if (removedPartitions.isEmpty()) {
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                if (config.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) {
 | 
			
		||||
                    repository.clearIf(tenantId -> {
 | 
			
		||||
                        Integer partition = partitionService.resolvePartition(tenantId, null);
 | 
			
		||||
                        return removedPartitions.contains(partition);
 | 
			
		||||
                    });
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions);
 | 
			
		||||
                }
 | 
			
		||||
                repository.clearIf(tenantId -> {
 | 
			
		||||
                    Integer partition = partitionService.resolvePartition(tenantId);
 | 
			
		||||
                    return partition != null && removedPartitions.contains(partition);
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Throwable t) {
 | 
			
		||||
            log.error("Failed to handle partition change event {}", event, t);
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.edqs.processor;
 | 
			
		||||
 | 
			
		||||
import lombok.Builder;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.kafka.common.errors.RecordTooLargeException;
 | 
			
		||||
import org.thingsboard.server.common.data.ObjectType;
 | 
			
		||||
@ -27,60 +28,41 @@ import org.thingsboard.server.queue.TbQueueCallback;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TopicService;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsQueue;
 | 
			
		||||
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Builder
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class EdqsProducer {
 | 
			
		||||
 | 
			
		||||
    private final EdqsQueue queue;
 | 
			
		||||
    private final EdqsPartitionService partitionService;
 | 
			
		||||
    private final TopicService topicService;
 | 
			
		||||
 | 
			
		||||
    private final TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> producer;
 | 
			
		||||
 | 
			
		||||
    @Builder
 | 
			
		||||
    public EdqsProducer(EdqsQueue queue,
 | 
			
		||||
                        EdqsPartitionService partitionService,
 | 
			
		||||
                        TopicService topicService,
 | 
			
		||||
                        TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> producer) {
 | 
			
		||||
        this.queue = queue;
 | 
			
		||||
        this.partitionService = partitionService;
 | 
			
		||||
        this.topicService = topicService;
 | 
			
		||||
        this.producer = producer;
 | 
			
		||||
    }
 | 
			
		||||
    private final EdqsPartitionService partitionService;
 | 
			
		||||
 | 
			
		||||
    public void send(TenantId tenantId, ObjectType type, String key, ToEdqsMsg msg) {
 | 
			
		||||
        String topic = topicService.buildTopicName(queue.getTopic());
 | 
			
		||||
        TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                .topic(producer.getDefaultTopic())
 | 
			
		||||
                .partition(partitionService.resolvePartition(tenantId, key))
 | 
			
		||||
                .build();
 | 
			
		||||
        TbQueueCallback callback = new TbQueueCallback() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(TbQueueMsgMetadata metadata) {
 | 
			
		||||
                log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, topic, msg);
 | 
			
		||||
                log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, tpi, msg);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                if (t instanceof RecordTooLargeException) {
 | 
			
		||||
                    if (!log.isDebugEnabled()) {
 | 
			
		||||
                        log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, topic, t); // not logging the whole message
 | 
			
		||||
                        log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, tpi, t); // not logging the whole message
 | 
			
		||||
                        return;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t);
 | 
			
		||||
                log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, tpi, msg, t);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        if (producer instanceof TbKafkaProducerTemplate<TbProtoQueueMsg<ToEdqsMsg>> kafkaProducer) {
 | 
			
		||||
            TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                    .topic(topic)
 | 
			
		||||
                    .partition(partitionService.resolvePartition(tenantId))
 | 
			
		||||
                    .useInternalPartition(true)
 | 
			
		||||
                    .build();
 | 
			
		||||
            kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction
 | 
			
		||||
        } else {
 | 
			
		||||
            TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                    .topic(topic)
 | 
			
		||||
                    .build();
 | 
			
		||||
            producer.send(tpi, new TbProtoQueueMsg<>(null, msg), callback);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -29,11 +29,14 @@ public class EdqsPartitionService {
 | 
			
		||||
    private final HashPartitionService hashPartitionService;
 | 
			
		||||
    private final EdqsConfig edqsConfig;
 | 
			
		||||
 | 
			
		||||
    public Integer resolvePartition(TenantId tenantId) {
 | 
			
		||||
    public Integer resolvePartition(TenantId tenantId, Object key) {
 | 
			
		||||
        if (edqsConfig.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) {
 | 
			
		||||
            return hashPartitionService.resolvePartitionIndex(tenantId.getId(), edqsConfig.getPartitions());
 | 
			
		||||
        } else {
 | 
			
		||||
            return null;
 | 
			
		||||
            if (key == null) {
 | 
			
		||||
                throw new IllegalArgumentException("Partitioning key is missing but partitioning strategy is not TENANT");
 | 
			
		||||
            }
 | 
			
		||||
            return hashPartitionService.resolvePartitionIndex(key.toString(), edqsConfig.getPartitions());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -45,6 +45,8 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
import java.util.stream.IntStream;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@ -141,17 +143,21 @@ public class KafkaEdqsStateService implements EdqsStateService {
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        stateProducer = EdqsProducer.builder()
 | 
			
		||||
                .queue(EdqsQueue.STATE)
 | 
			
		||||
                .partitionService(partitionService)
 | 
			
		||||
                .topicService(topicService)
 | 
			
		||||
                .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE))
 | 
			
		||||
                .partitionService(partitionService)
 | 
			
		||||
                .build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
        if (queueStateService.getPartitions().isEmpty()) {
 | 
			
		||||
            eventsToBackupConsumer.subscribe();
 | 
			
		||||
            Set<TopicPartitionInfo> allPartitions = IntStream.range(0, config.getPartitions())
 | 
			
		||||
                    .mapToObj(partition -> TopicPartitionInfo.builder()
 | 
			
		||||
                            .topic(EdqsQueue.EVENTS.getTopic())
 | 
			
		||||
                            .partition(partition)
 | 
			
		||||
                            .build())
 | 
			
		||||
                    .collect(Collectors.toSet());
 | 
			
		||||
            eventsToBackupConsumer.subscribe(allPartitions);
 | 
			
		||||
            eventsToBackupConsumer.launch();
 | 
			
		||||
        }
 | 
			
		||||
        queueStateService.update(new QueueKey(ServiceType.EDQS), partitions);
 | 
			
		||||
 | 
			
		||||
@ -263,7 +263,6 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
        TopicPartitionInfo tpi = TopicPartitionInfo.builder()
 | 
			
		||||
                .topic(requestTemplate.getDefaultTopic())
 | 
			
		||||
                .partition(partition)
 | 
			
		||||
                .useInternalPartition(partition != null)
 | 
			
		||||
                .build();
 | 
			
		||||
        requestTemplate.send(tpi, request, new TbQueueCallback() {
 | 
			
		||||
            @Override
 | 
			
		||||
 | 
			
		||||
@ -39,6 +39,7 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent;
 | 
			
		||||
import org.thingsboard.server.queue.util.AfterStartUp;
 | 
			
		||||
 | 
			
		||||
import java.nio.charset.StandardCharsets;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
@ -559,7 +560,15 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public int resolvePartitionIndex(UUID entityId, int partitions) {
 | 
			
		||||
        int hash = hash(entityId);
 | 
			
		||||
        return resolvePartitionIndex(hash(entityId), partitions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public int resolvePartitionIndex(String key, int partitions) {
 | 
			
		||||
        return resolvePartitionIndex(hash(key), partitions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private int resolvePartitionIndex(int hash, int partitions) {
 | 
			
		||||
        return Math.abs(hash % partitions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -725,6 +734,12 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
                .hash().asInt();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private int hash(String key) {
 | 
			
		||||
        return hashFunction.newHasher()
 | 
			
		||||
                .putString(key, StandardCharsets.UTF_8)
 | 
			
		||||
                .hash().asInt();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static HashFunction forName(String name) {
 | 
			
		||||
        return switch (name) {
 | 
			
		||||
            case "murmur3_32" -> Hashing.murmur3_32();
 | 
			
		||||
 | 
			
		||||
@ -79,4 +79,6 @@ public interface PartitionService {
 | 
			
		||||
 | 
			
		||||
    int resolvePartitionIndex(UUID entityId, int partitions);
 | 
			
		||||
 | 
			
		||||
    int resolvePartitionIndex(String key, int partitions);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -95,6 +95,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
 | 
			
		||||
        return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
 | 
			
		||||
                .clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId())
 | 
			
		||||
                .defaultTopic(topicService.buildTopicName(queue.getTopic()))
 | 
			
		||||
                .settings(kafkaSettings)
 | 
			
		||||
                .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
@ -188,31 +188,46 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean isTopicEmpty(String topic) {
 | 
			
		||||
        return areAllTopicsEmpty(Set.of(topic));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean areAllTopicsEmpty(Set<String> topics) {
 | 
			
		||||
        try {
 | 
			
		||||
            if (!getTopics().contains(topic)) {
 | 
			
		||||
            List<String> existingTopics = getTopics().stream().filter(topics::contains).toList();
 | 
			
		||||
            if (existingTopics.isEmpty()) {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
            TopicDescription topicDescription = settings.getAdminClient().describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic).get();
 | 
			
		||||
            List<TopicPartition> partitions = topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).toList();
 | 
			
		||||
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = settings.getAdminClient().listOffsets(partitions.stream()
 | 
			
		||||
            List<TopicPartition> allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream()
 | 
			
		||||
                    .flatMap(entry -> {
 | 
			
		||||
                        String topic = entry.getKey();
 | 
			
		||||
                        TopicDescription topicDescription;
 | 
			
		||||
                        try {
 | 
			
		||||
                            topicDescription = entry.getValue().get();
 | 
			
		||||
                        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
                            throw new RuntimeException(e);
 | 
			
		||||
                        }
 | 
			
		||||
                        return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
 | 
			
		||||
                    })
 | 
			
		||||
                    .toList();
 | 
			
		||||
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
 | 
			
		||||
                    .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get();
 | 
			
		||||
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = settings.getAdminClient().listOffsets(partitions.stream()
 | 
			
		||||
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
 | 
			
		||||
                    .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get();
 | 
			
		||||
 | 
			
		||||
            for (TopicPartition partition : partitions) {
 | 
			
		||||
            for (TopicPartition partition : allPartitions) {
 | 
			
		||||
                long beginningOffset = beginningOffsets.get(partition).offset();
 | 
			
		||||
                long endOffset = endOffsets.get(partition).offset();
 | 
			
		||||
 | 
			
		||||
                if (beginningOffset != endOffset) {
 | 
			
		||||
                    log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), topic);
 | 
			
		||||
                    log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic());
 | 
			
		||||
                    return false;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            return true;
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            log.error("Failed to check if topic [{}] is empty.", topic, e);
 | 
			
		||||
            log.error("Failed to check if topics [{}] empty.", topics, e);
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -593,6 +593,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
 | 
			
		||||
        return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
 | 
			
		||||
                .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
 | 
			
		||||
                .defaultTopic(topicService.buildTopicName(queue.getTopic()))
 | 
			
		||||
                .settings(kafkaSettings)
 | 
			
		||||
                .admin(edqsEventsAdmin)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
@ -483,6 +483,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
 | 
			
		||||
        return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
 | 
			
		||||
                .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
 | 
			
		||||
                .defaultTopic(topicService.buildTopicName(queue.getTopic()))
 | 
			
		||||
                .settings(kafkaSettings)
 | 
			
		||||
                .admin(edqsEventsAdmin)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
@ -388,6 +388,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
 | 
			
		||||
    public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
 | 
			
		||||
        return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
 | 
			
		||||
                .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
 | 
			
		||||
                .defaultTopic(topicService.buildTopicName(queue.getTopic()))
 | 
			
		||||
                .settings(kafkaSettings)
 | 
			
		||||
                .admin(edqsEventsAdmin)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
@ -148,12 +148,12 @@ queue:
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
      # Kafka properties for EDQS events topics
 | 
			
		||||
      edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS requests topic (default: 3 minutes retention)
 | 
			
		||||
      edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      # Kafka properties for EDQS state topic (infinite retention, compaction)
 | 
			
		||||
      edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}"
 | 
			
		||||
    consumer-stats:
 | 
			
		||||
      # Prints lag between consumer group offset and last messages offset in Kafka topics
 | 
			
		||||
      enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@ public class Latencies {
 | 
			
		||||
    public static final String WS_CONNECT = "wsConnect";
 | 
			
		||||
    public static final String WS_SUBSCRIBE = "wsSubscribe";
 | 
			
		||||
    public static final String LOG_IN = "logIn";
 | 
			
		||||
    public static final String EDQS_QUERY = "edqsQuery";
 | 
			
		||||
 | 
			
		||||
    public static String request(String key) {
 | 
			
		||||
        return String.format("%sRequest", key);
 | 
			
		||||
 | 
			
		||||
@ -18,5 +18,6 @@ package org.thingsboard.monitoring.data;
 | 
			
		||||
public class MonitoredServiceKey {
 | 
			
		||||
 | 
			
		||||
    public static final String GENERAL = "Monitoring";
 | 
			
		||||
    public static final String EDQS = "*EDQS*";
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,10 +15,13 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.monitoring.service;
 | 
			
		||||
 | 
			
		||||
import com.google.common.collect.Sets;
 | 
			
		||||
import jakarta.annotation.PostConstruct;
 | 
			
		||||
import lombok.SneakyThrows;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.context.ApplicationContext;
 | 
			
		||||
import org.thingsboard.monitoring.client.TbClient;
 | 
			
		||||
import org.thingsboard.monitoring.client.WsClient;
 | 
			
		||||
@ -27,13 +30,26 @@ import org.thingsboard.monitoring.config.MonitoringConfig;
 | 
			
		||||
import org.thingsboard.monitoring.config.MonitoringTarget;
 | 
			
		||||
import org.thingsboard.monitoring.data.Latencies;
 | 
			
		||||
import org.thingsboard.monitoring.data.MonitoredServiceKey;
 | 
			
		||||
import org.thingsboard.monitoring.data.ServiceFailureException;
 | 
			
		||||
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
 | 
			
		||||
import org.thingsboard.monitoring.util.TbStopWatch;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityData;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityDataPageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityKey;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityKeyType;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityTypeFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.query.TsValue;
 | 
			
		||||
 | 
			
		||||
import java.net.InetAddress;
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.net.URISyntaxException;
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.LinkedList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -41,6 +57,7 @@ import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T extends MonitoringTarget> {
 | 
			
		||||
@ -61,6 +78,9 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected ApplicationContext applicationContext;
 | 
			
		||||
 | 
			
		||||
    @Value("${monitoring.edqs.enabled:false}")
 | 
			
		||||
    private boolean edqsMonitoringEnabled;
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        if (configs == null || configs.isEmpty()) {
 | 
			
		||||
@ -108,6 +128,21 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
 | 
			
		||||
                    check(healthChecker, wsClient);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (edqsMonitoringEnabled) {
 | 
			
		||||
                try {
 | 
			
		||||
                    stopWatch.start();
 | 
			
		||||
                    checkEdqs();
 | 
			
		||||
                    reporter.reportLatency(Latencies.EDQS_QUERY, stopWatch.getTime());
 | 
			
		||||
 | 
			
		||||
                    reporter.serviceIsOk(MonitoredServiceKey.EDQS);
 | 
			
		||||
                } catch (ServiceFailureException e) {
 | 
			
		||||
                    reporter.serviceFailure(MonitoredServiceKey.EDQS, e);
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                    reporter.serviceFailure(MonitoredServiceKey.GENERAL, e);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            reporter.reportLatencies(tbClient);
 | 
			
		||||
            log.debug("Finished {}", getName());
 | 
			
		||||
        } catch (Throwable error) {
 | 
			
		||||
@ -149,6 +184,39 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkEdqs() {
 | 
			
		||||
        EntityTypeFilter entityTypeFilter = new EntityTypeFilter();
 | 
			
		||||
        entityTypeFilter.setEntityType(EntityType.DEVICE);
 | 
			
		||||
        EntityDataPageLink pageLink = new EntityDataPageLink(100, 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, "name")));
 | 
			
		||||
        EntityDataQuery entityDataQuery = new EntityDataQuery(entityTypeFilter, pageLink,
 | 
			
		||||
                List.of(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), new EntityKey(EntityKeyType.ENTITY_FIELD, "type")),
 | 
			
		||||
                List.of(new EntityKey(EntityKeyType.TIME_SERIES, "testData")),
 | 
			
		||||
                Collections.emptyList());
 | 
			
		||||
 | 
			
		||||
        PageData<EntityData> result = tbClient.findEntityDataByQuery(entityDataQuery);
 | 
			
		||||
        Set<UUID> devices = result.getData().stream()
 | 
			
		||||
                .map(entityData -> entityData.getEntityId().getId())
 | 
			
		||||
                .collect(Collectors.toSet());
 | 
			
		||||
        Set<UUID> missing = Sets.difference(new HashSet<>(this.devices), devices);
 | 
			
		||||
        if (!missing.isEmpty()) {
 | 
			
		||||
            throw new ServiceFailureException("Missing devices in the response: " + missing);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        result.getData().stream()
 | 
			
		||||
                .filter(entityData -> this.devices.contains(entityData.getEntityId().getId()))
 | 
			
		||||
                .forEach(entityData -> {
 | 
			
		||||
                    Map<String, TsValue> values = new HashMap<>(entityData.getLatest().get(EntityKeyType.ENTITY_FIELD));
 | 
			
		||||
                    values.putAll(entityData.getLatest().get(EntityKeyType.TIME_SERIES));
 | 
			
		||||
 | 
			
		||||
                    Stream.of("name", "type", "testData").forEach(key -> {
 | 
			
		||||
                        TsValue value = values.get(key);
 | 
			
		||||
                        if (value == null || StringUtils.isBlank(value.getValue())) {
 | 
			
		||||
                            throw new ServiceFailureException("Missing " + key + " for device " + entityData.getEntityId());
 | 
			
		||||
                        }
 | 
			
		||||
                    });
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @SneakyThrows
 | 
			
		||||
    private Set<String> getAssociatedUrls(String baseUrl) {
 | 
			
		||||
        URI url = new URI(baseUrl);
 | 
			
		||||
 | 
			
		||||
@ -105,6 +105,9 @@ monitoring:
 | 
			
		||||
        # To add more targets, use following environment variables:
 | 
			
		||||
        # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc.
 | 
			
		||||
 | 
			
		||||
  edqs:
 | 
			
		||||
    enabled: "${EDQS_MONITORING_ENABLED:false}"
 | 
			
		||||
 | 
			
		||||
  notifications:
 | 
			
		||||
    message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}'
 | 
			
		||||
    slack:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user