diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java index 51c963ed2f..c7e17b62ae 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java @@ -76,8 +76,9 @@ public class DefaultEdqsApiService implements EdqsApiService { requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits()); } - Integer partition = edqsPartitionService.resolvePartition(tenantId); - ListenableFuture> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition); + UUID key = UUID.randomUUID(); + Integer partition = edqsPartitionService.resolvePartition(tenantId, key); + ListenableFuture> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(key, requestMsg.build()), partition); return Futures.transform(resultFuture, msg -> { TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg(); return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index e823dee4e7..7d5a0cb0fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -96,10 +96,8 @@ public class DefaultEdqsService implements EdqsService { private void init() { executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass()); eventsProducer = EdqsProducer.builder() - .queue(EdqsQueue.EVENTS) - .partitionService(edqsPartitionService) - .topicService(topicService) .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS)) + .partitionService(edqsPartitionService) .build(); syncLock = distributedLockService.getLock("edqs_sync"); } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 4ef552521b..ad7b7b970d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -17,11 +17,15 @@ package org.thingsboard.server.service.edqs; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Service @ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'") @@ -29,9 +33,14 @@ public class KafkaEdqsSyncService extends EdqsSyncService { private final boolean syncNeeded; - public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) { + public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, EdqsConfig edqsConfig) { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); - this.syncNeeded = kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic()); + this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) + .mapToObj(partition -> TopicPartitionInfo.builder() + .topic(EdqsQueue.EVENTS.getTopic()) + .partition(partition) + .build().getFullTopicName()) + .collect(Collectors.toSet())); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c61a993d56..eac787d52b 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1645,12 +1645,12 @@ queue: calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for Calculated Field State topics calculated-field-state: "${TB_QUEUE_KAFKA_CF_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:104857600000;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" - # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions - edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + # Kafka properties for EDQS events topics + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS requests topic (default: 3 minutes retention) + edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS state topic (infinite retention, compaction) + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 7ddc9147df..fb15c3fc73 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -164,25 +164,27 @@ public class EdqsProcessor implements TbQueueHandler, } try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - Set partitions = newPartitions.stream() - .map(tpi -> tpi.withUseInternalPartition(true)) - .collect(Collectors.toSet()); - stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic())); + stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic())); // eventsConsumer's partitions are updated by stateService - responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); // FIXME: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template + responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); if (CollectionsUtil.isNotEmpty(oldPartitions)) { Set removedPartitions = Sets.difference(oldPartitions, newPartitions).stream() .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet()); - if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) { + if (removedPartitions.isEmpty()) { + return; + } + + if (config.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) { + repository.clearIf(tenantId -> { + Integer partition = partitionService.resolvePartition(tenantId, null); + return removedPartitions.contains(partition); + }); + } else { log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions); } - repository.clearIf(tenantId -> { - Integer partition = partitionService.resolvePartition(tenantId); - return partition != null && removedPartitions.contains(partition); - }); } } catch (Throwable t) { log.error("Failed to handle partition change event {}", event, t); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index be1f0481be..cc4f913d38 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -16,6 +16,7 @@ package org.thingsboard.server.edqs.processor; import lombok.Builder; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.RecordTooLargeException; import org.thingsboard.server.common.data.ObjectType; @@ -27,60 +28,41 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; @Slf4j +@Builder +@RequiredArgsConstructor public class EdqsProducer { - private final EdqsQueue queue; - private final EdqsPartitionService partitionService; - private final TopicService topicService; - private final TbQueueProducer> producer; - - @Builder - public EdqsProducer(EdqsQueue queue, - EdqsPartitionService partitionService, - TopicService topicService, - TbQueueProducer> producer) { - this.queue = queue; - this.partitionService = partitionService; - this.topicService = topicService; - this.producer = producer; - } + private final EdqsPartitionService partitionService; public void send(TenantId tenantId, ObjectType type, String key, ToEdqsMsg msg) { - String topic = topicService.buildTopicName(queue.getTopic()); + TopicPartitionInfo tpi = TopicPartitionInfo.builder() + .topic(producer.getDefaultTopic()) + .partition(partitionService.resolvePartition(tenantId, key)) + .build(); TbQueueCallback callback = new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, topic, msg); + log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, tpi, msg); } @Override public void onFailure(Throwable t) { if (t instanceof RecordTooLargeException) { if (!log.isDebugEnabled()) { - log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, topic, t); // not logging the whole message + log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, tpi, t); // not logging the whole message return; } } - log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t); + log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, tpi, msg, t); } }; if (producer instanceof TbKafkaProducerTemplate> kafkaProducer) { - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .partition(partitionService.resolvePartition(tenantId)) - .useInternalPartition(true) - .build(); kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction } else { - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .build(); producer.send(tpi, new TbProtoQueueMsg<>(null, msg), callback); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java index 94e9437650..e2fbf9a981 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsPartitionService.java @@ -29,11 +29,14 @@ public class EdqsPartitionService { private final HashPartitionService hashPartitionService; private final EdqsConfig edqsConfig; - public Integer resolvePartition(TenantId tenantId) { + public Integer resolvePartition(TenantId tenantId, Object key) { if (edqsConfig.getPartitioningStrategy() == EdqsPartitioningStrategy.TENANT) { return hashPartitionService.resolvePartitionIndex(tenantId.getId(), edqsConfig.getPartitions()); } else { - return null; + if (key == null) { + throw new IllegalArgumentException("Partitioning key is missing but partitioning strategy is not TENANT"); + } + return hashPartitionService.resolvePartitionIndex(key.toString(), edqsConfig.getPartitions()); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 85b8e92387..c59707c9c3 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -45,6 +45,8 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Service @RequiredArgsConstructor @@ -141,17 +143,21 @@ public class KafkaEdqsStateService implements EdqsStateService { .build(); stateProducer = EdqsProducer.builder() - .queue(EdqsQueue.STATE) - .partitionService(partitionService) - .topicService(topicService) .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE)) + .partitionService(partitionService) .build(); } @Override public void process(Set partitions) { if (queueStateService.getPartitions().isEmpty()) { - eventsToBackupConsumer.subscribe(); + Set allPartitions = IntStream.range(0, config.getPartitions()) + .mapToObj(partition -> TopicPartitionInfo.builder() + .topic(EdqsQueue.EVENTS.getTopic()) + .partition(partition) + .build()) + .collect(Collectors.toSet()); + eventsToBackupConsumer.subscribe(allPartitions); eventsToBackupConsumer.launch(); } queueStateService.update(new QueueKey(ServiceType.EDQS), partitions); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index 4efb297491..1beb505595 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -263,7 +263,6 @@ public class DefaultTbQueueRequestTemplate Hashing.murmur3_32(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 7abd68e25f..5dda413f17 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -79,4 +79,6 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); + int resolvePartitionIndex(String key, int partitions); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index e985696040..a322cc5434 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -95,6 +95,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 4ae744be67..3496aac76a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -188,31 +188,46 @@ public class TbKafkaAdmin implements TbQueueAdmin { } public boolean isTopicEmpty(String topic) { + return areAllTopicsEmpty(Set.of(topic)); + } + + public boolean areAllTopicsEmpty(Set topics) { try { - if (!getTopics().contains(topic)) { + List existingTopics = getTopics().stream().filter(topics::contains).toList(); + if (existingTopics.isEmpty()) { return true; } - TopicDescription topicDescription = settings.getAdminClient().describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic).get(); - List partitions = topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).toList(); - Map beginningOffsets = settings.getAdminClient().listOffsets(partitions.stream() + List allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream() + .flatMap(entry -> { + String topic = entry.getKey(); + TopicDescription topicDescription; + try { + topicDescription = entry.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); + }) + .toList(); + + Map beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(); - - Map endOffsets = settings.getAdminClient().listOffsets(partitions.stream() + Map endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(); - for (TopicPartition partition : partitions) { + for (TopicPartition partition : allPartitions) { long beginningOffset = beginningOffsets.get(partition).offset(); long endOffset = endOffsets.get(partition).offset(); if (beginningOffset != endOffset) { - log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), topic); + log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic()); return false; } } return true; } catch (InterruptedException | ExecutionException e) { - log.error("Failed to check if topic [{}] is empty.", topic, e); + log.error("Failed to check if topics [{}] empty.", topics, e); return false; } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index d688003115..c50344a23f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -593,6 +593,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 3c6d144a0c..2a1dc4171f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -483,6 +483,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index cb5c25141e..2e342b0dd2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -388,6 +388,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index f7d0eda841..c101eff68e 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -148,12 +148,12 @@ queue: # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: - # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions - edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" - # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + # Kafka properties for EDQS events topics + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS requests topic (default: 3 minutes retention) + edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for EDQS state topic (infinite retention, compaction) + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:1;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index a190b7b791..b42e9138e4 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -20,6 +20,7 @@ public class Latencies { public static final String WS_CONNECT = "wsConnect"; public static final String WS_SUBSCRIBE = "wsSubscribe"; public static final String LOG_IN = "logIn"; + public static final String EDQS_QUERY = "edqsQuery"; public static String request(String key) { return String.format("%sRequest", key); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java index 342ee121ef..9c3ee5b786 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoredServiceKey.java @@ -18,5 +18,6 @@ package org.thingsboard.monitoring.data; public class MonitoredServiceKey { public static final String GENERAL = "Monitoring"; + public static final String EDQS = "*EDQS*"; } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java index 9157e9f31c..7219e4f9aa 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseMonitoringService.java @@ -15,10 +15,13 @@ */ package org.thingsboard.monitoring.service; +import com.google.common.collect.Sets; import jakarta.annotation.PostConstruct; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.thingsboard.monitoring.client.TbClient; import org.thingsboard.monitoring.client.WsClient; @@ -27,13 +30,26 @@ import org.thingsboard.monitoring.config.MonitoringConfig; import org.thingsboard.monitoring.config.MonitoringTarget; import org.thingsboard.monitoring.data.Latencies; import org.thingsboard.monitoring.data.MonitoredServiceKey; +import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.service.transport.TransportHealthChecker; import org.thingsboard.monitoring.util.TbStopWatch; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityDataPageLink; +import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.common.data.query.EntityDataSortOrder; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.EntityTypeFilter; +import org.thingsboard.server.common.data.query.TsValue; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -41,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; @Slf4j public abstract class BaseMonitoringService, T extends MonitoringTarget> { @@ -61,6 +78,9 @@ public abstract class BaseMonitoringService, T ext @Autowired protected ApplicationContext applicationContext; + @Value("${monitoring.edqs.enabled:false}") + private boolean edqsMonitoringEnabled; + @PostConstruct private void init() { if (configs == null || configs.isEmpty()) { @@ -108,6 +128,21 @@ public abstract class BaseMonitoringService, T ext check(healthChecker, wsClient); } } + + if (edqsMonitoringEnabled) { + try { + stopWatch.start(); + checkEdqs(); + reporter.reportLatency(Latencies.EDQS_QUERY, stopWatch.getTime()); + + reporter.serviceIsOk(MonitoredServiceKey.EDQS); + } catch (ServiceFailureException e) { + reporter.serviceFailure(MonitoredServiceKey.EDQS, e); + } catch (Exception e) { + reporter.serviceFailure(MonitoredServiceKey.GENERAL, e); + } + } + reporter.reportLatencies(tbClient); log.debug("Finished {}", getName()); } catch (Throwable error) { @@ -149,6 +184,39 @@ public abstract class BaseMonitoringService, T ext } } + private void checkEdqs() { + EntityTypeFilter entityTypeFilter = new EntityTypeFilter(); + entityTypeFilter.setEntityType(EntityType.DEVICE); + EntityDataPageLink pageLink = new EntityDataPageLink(100, 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"))); + EntityDataQuery entityDataQuery = new EntityDataQuery(entityTypeFilter, pageLink, + List.of(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), new EntityKey(EntityKeyType.ENTITY_FIELD, "type")), + List.of(new EntityKey(EntityKeyType.TIME_SERIES, "testData")), + Collections.emptyList()); + + PageData result = tbClient.findEntityDataByQuery(entityDataQuery); + Set devices = result.getData().stream() + .map(entityData -> entityData.getEntityId().getId()) + .collect(Collectors.toSet()); + Set missing = Sets.difference(new HashSet<>(this.devices), devices); + if (!missing.isEmpty()) { + throw new ServiceFailureException("Missing devices in the response: " + missing); + } + + result.getData().stream() + .filter(entityData -> this.devices.contains(entityData.getEntityId().getId())) + .forEach(entityData -> { + Map values = new HashMap<>(entityData.getLatest().get(EntityKeyType.ENTITY_FIELD)); + values.putAll(entityData.getLatest().get(EntityKeyType.TIME_SERIES)); + + Stream.of("name", "type", "testData").forEach(key -> { + TsValue value = values.get(key); + if (value == null || StringUtils.isBlank(value.getValue())) { + throw new ServiceFailureException("Missing " + key + " for device " + entityData.getEntityId()); + } + }); + }); + } + @SneakyThrows private Set getAssociatedUrls(String baseUrl) { URI url = new URI(baseUrl); diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index ae0d265ed6..6cc2e79cb4 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -105,6 +105,9 @@ monitoring: # To add more targets, use following environment variables: # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. + edqs: + enabled: "${EDQS_MONITORING_ENABLED:false}" + notifications: message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}' slack: