diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 9349946a80..ac7af4691d 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -76,6 +76,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; + @EdqsComponent @Service @RequiredArgsConstructor @@ -280,12 +282,6 @@ public class EdqsProcessor implements TbQueueHandler, } } - private Set withTopic(Set partitions, String topic) { - return partitions.stream() - .map(tpi -> tpi.withTopic(topic)) - .collect(Collectors.toSet()); - } - @PreDestroy public void destroy() throws InterruptedException { eventConsumer.stop(); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index c1620e5ec7..67724dd8b1 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -17,6 +17,8 @@ package org.thingsboard.server.edqs.state; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; @@ -32,14 +34,17 @@ import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent; import java.util.Set; +import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; + @Service @RequiredArgsConstructor @InMemoryEdqsComponent @Slf4j public class LocalEdqsStateService implements EdqsStateService { - private final EdqsProcessor processor; private final EdqsRocksDb db; + @Autowired @Lazy + private EdqsProcessor processor; private PartitionedQueueConsumerManager> eventConsumer; private Set partitions; @@ -61,8 +66,9 @@ public class LocalEdqsStateService implements EdqsStateService { log.error("[{}] Failed to restore value", key, e); } }); + log.info("Restore completed"); } - eventConsumer.update(partitions); + eventConsumer.update(withTopic(partitions, EdqsQueue.EVENTS.getTopic())); this.partitions = partitions; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java index da7f16ec0b..c7a23451d1 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java @@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.id.TenantId; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; public class TopicPartitionInfo { @@ -75,6 +77,10 @@ public class TopicPartitionInfo { return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.useInternalPartition, this.myPartition); } + public static Set withTopic(Set partitions, String topic) { + return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet()); + } + public TopicPartitionInfo withUseInternalPartition(boolean useInternalPartition) { return new TopicPartitionInfo(this.topic, this.tenantId, this.partition, useInternalPartition, this.myPartition); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 0e399929f4..88e8f60dee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -31,7 +31,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -95,7 +94,7 @@ public abstract class AbstractTbQueueConsumerTemplate i partitions = subscribeQueue.poll(); } if (!subscribed) { - log.info("Subscribing to topics {}", getFullTopicNames()); + log.info("Subscribing to {}", partitions); doSubscribe(partitions); subscribed = true; } @@ -168,7 +167,7 @@ public abstract class AbstractTbQueueConsumerTemplate i @Override public void unsubscribe() { - log.info("Unsubscribing and stopping consumer for topics {}", getFullTopicNames()); + log.info("Unsubscribing and stopping consumer for {}", partitions); stopped = true; consumerLock.lock(); try { @@ -201,9 +200,8 @@ public abstract class AbstractTbQueueConsumerTemplate i return Collections.emptyList(); } return partitions.stream() - .map(tpi -> tpi.getFullTopicName() + (tpi.isUseInternalPartition() ? - "[" + tpi.getPartition().orElse(-1) + "]" : "")) - .collect(Collectors.toList()); + .map(TopicPartitionInfo::getFullTopicName) + .toList(); } protected boolean isLongPollingSupported() { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java index 00015457c1..2671f03b95 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java @@ -26,7 +26,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; + +import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; @Slf4j public class QueueStateService { @@ -88,10 +89,6 @@ public class QueueStateService { initialized = true; } - private Set withTopic(Set partitions, String topic) { - return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet()); - } - public Set getPartitionsInProgress() { return initialized ? partitionsInProgress : null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java index 20b4beadcf..dc6e9f4791 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java @@ -18,7 +18,8 @@ package org.thingsboard.server.queue.edqs; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.server.common.stats.DummyMessagesStats; +import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.TbQueueConsumer; @@ -37,6 +38,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { private final InMemoryStorage storage; private final EdqsConfig edqsConfig; + private final StatsFactory statsFactory; @Override public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue) { @@ -69,7 +71,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { .maxPendingRequests(edqsConfig.getMaxPendingRequests()) .requestTimeout(edqsConfig.getMaxRequestTimeout()) .pollInterval(edqsConfig.getPollInterval()) - .stats(new DummyMessagesStats()) // FIXME + .stats(statsFactory.createMessagesStats(StatsType.EDQS.getName())) .executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs")) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index 9f973ce3f1..9561849dd1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -17,7 +17,8 @@ package org.thingsboard.server.queue.edqs; import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.server.common.stats.DummyMessagesStats; +import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; @@ -49,12 +50,14 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { private final TbServiceInfoProvider serviceInfoProvider; private final TbKafkaConsumerStatsService consumerStatsService; private final TopicService topicService; + private final StatsFactory statsFactory; private final AtomicInteger consumerCounter = new AtomicInteger(); public KafkaEdqsQueueFactory(TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs topicConfigs, EdqsConfig edqsConfig, TbServiceInfoProvider serviceInfoProvider, - TbKafkaConsumerStatsService consumerStatsService, TopicService topicService) { + TbKafkaConsumerStatsService consumerStatsService, TopicService topicService, + StatsFactory statsFactory) { this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsEventsConfigs()); this.edqsRequestsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsRequestsConfigs()); this.edqsStateAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsStateConfigs()); @@ -63,6 +66,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { this.serviceInfoProvider = serviceInfoProvider; this.consumerStatsService = consumerStatsService; this.topicService = topicService; + this.statsFactory = statsFactory; } @Override @@ -117,7 +121,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { .maxPendingRequests(edqsConfig.getMaxPendingRequests()) .requestTimeout(edqsConfig.getMaxRequestTimeout()) .pollInterval(edqsConfig.getPollInterval()) - .stats(new DummyMessagesStats()) // FIXME + .stats(statsFactory.createMessagesStats(StatsType.EDQS.getName())) .executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs")) .build(); } diff --git a/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java index e1940e55fe..ca97792186 100644 --- a/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java +++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java @@ -38,7 +38,7 @@ public class DefaultStatsFactory implements StatsFactory { private static final Counter STUB_COUNTER = new StubCounter(); - @Autowired // FIXME Slavik !!! + @Autowired private MeterRegistry meterRegistry; @Value("${metrics.enabled:false}")