Fixes for EDQS

This commit is contained in:
ViacheslavKlimov 2025-02-21 15:48:33 +02:00
parent 965210f17b
commit 278ec36f37
8 changed files with 34 additions and 25 deletions

View File

@ -76,6 +76,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@EdqsComponent @EdqsComponent
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -280,12 +282,6 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
} }
} }
private Set<TopicPartitionInfo> withTopic(Set<TopicPartitionInfo> partitions, String topic) {
return partitions.stream()
.map(tpi -> tpi.withTopic(topic))
.collect(Collectors.toSet());
}
@PreDestroy @PreDestroy
public void destroy() throws InterruptedException { public void destroy() throws InterruptedException {
eventConsumer.stop(); eventConsumer.stop();

View File

@ -17,6 +17,8 @@ package org.thingsboard.server.edqs.state;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.edqs.EdqsEventType;
@ -32,14 +34,17 @@ import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
import java.util.Set; import java.util.Set;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@InMemoryEdqsComponent @InMemoryEdqsComponent
@Slf4j @Slf4j
public class LocalEdqsStateService implements EdqsStateService { public class LocalEdqsStateService implements EdqsStateService {
private final EdqsProcessor processor;
private final EdqsRocksDb db; private final EdqsRocksDb db;
@Autowired @Lazy
private EdqsProcessor processor;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer; private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
private Set<TopicPartitionInfo> partitions; private Set<TopicPartitionInfo> partitions;
@ -61,8 +66,9 @@ public class LocalEdqsStateService implements EdqsStateService {
log.error("[{}] Failed to restore value", key, e); log.error("[{}] Failed to restore value", key, e);
} }
}); });
log.info("Restore completed");
} }
eventConsumer.update(partitions); eventConsumer.update(withTopic(partitions, EdqsQueue.EVENTS.getTopic()));
this.partitions = partitions; this.partitions = partitions;
} }

View File

@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.id.TenantId;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class TopicPartitionInfo { public class TopicPartitionInfo {
@ -75,6 +77,10 @@ public class TopicPartitionInfo {
return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.useInternalPartition, this.myPartition); return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.useInternalPartition, this.myPartition);
} }
public static Set<TopicPartitionInfo> withTopic(Set<TopicPartitionInfo> partitions, String topic) {
return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet());
}
public TopicPartitionInfo withUseInternalPartition(boolean useInternalPartition) { public TopicPartitionInfo withUseInternalPartition(boolean useInternalPartition) {
return new TopicPartitionInfo(this.topic, this.tenantId, this.partition, useInternalPartition, this.myPartition); return new TopicPartitionInfo(this.topic, this.tenantId, this.partition, useInternalPartition, this.myPartition);
} }

View File

@ -31,7 +31,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
@ -95,7 +94,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
partitions = subscribeQueue.poll(); partitions = subscribeQueue.poll();
} }
if (!subscribed) { if (!subscribed) {
log.info("Subscribing to topics {}", getFullTopicNames()); log.info("Subscribing to {}", partitions);
doSubscribe(partitions); doSubscribe(partitions);
subscribed = true; subscribed = true;
} }
@ -168,7 +167,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override @Override
public void unsubscribe() { public void unsubscribe() {
log.info("Unsubscribing and stopping consumer for topics {}", getFullTopicNames()); log.info("Unsubscribing and stopping consumer for {}", partitions);
stopped = true; stopped = true;
consumerLock.lock(); consumerLock.lock();
try { try {
@ -201,9 +200,8 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
return Collections.emptyList(); return Collections.emptyList();
} }
return partitions.stream() return partitions.stream()
.map(tpi -> tpi.getFullTopicName() + (tpi.isUseInternalPartition() ? .map(TopicPartitionInfo::getFullTopicName)
"[" + tpi.getPartition().orElse(-1) + "]" : "")) .toList();
.collect(Collectors.toList());
} }
protected boolean isLongPollingSupported() { protected boolean isLongPollingSupported() {

View File

@ -26,7 +26,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@Slf4j @Slf4j
public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> { public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
@ -88,10 +89,6 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
initialized = true; initialized = true;
} }
private Set<TopicPartitionInfo> withTopic(Set<TopicPartitionInfo> partitions, String topic) {
return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet());
}
public Set<TopicPartitionInfo> getPartitionsInProgress() { public Set<TopicPartitionInfo> getPartitionsInProgress() {
return initialized ? partitionsInProgress : null; return initialized ? partitionsInProgress : null;
} }

View File

@ -18,7 +18,8 @@ package org.thingsboard.server.queue.edqs;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.stats.DummyMessagesStats; import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
@ -37,6 +38,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
private final InMemoryStorage storage; private final InMemoryStorage storage;
private final EdqsConfig edqsConfig; private final EdqsConfig edqsConfig;
private final StatsFactory statsFactory;
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) { public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) {
@ -69,7 +71,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
.maxPendingRequests(edqsConfig.getMaxPendingRequests()) .maxPendingRequests(edqsConfig.getMaxPendingRequests())
.requestTimeout(edqsConfig.getMaxRequestTimeout()) .requestTimeout(edqsConfig.getMaxRequestTimeout())
.pollInterval(edqsConfig.getPollInterval()) .pollInterval(edqsConfig.getPollInterval())
.stats(new DummyMessagesStats()) // FIXME .stats(statsFactory.createMessagesStats(StatsType.EDQS.getName()))
.executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs")) .executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs"))
.build(); .build();
} }

View File

@ -17,7 +17,8 @@ package org.thingsboard.server.queue.edqs;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.stats.DummyMessagesStats; import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
@ -49,12 +50,14 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final TbKafkaConsumerStatsService consumerStatsService; private final TbKafkaConsumerStatsService consumerStatsService;
private final TopicService topicService; private final TopicService topicService;
private final StatsFactory statsFactory;
private final AtomicInteger consumerCounter = new AtomicInteger(); private final AtomicInteger consumerCounter = new AtomicInteger();
public KafkaEdqsQueueFactory(TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs topicConfigs, public KafkaEdqsQueueFactory(TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs topicConfigs,
EdqsConfig edqsConfig, TbServiceInfoProvider serviceInfoProvider, EdqsConfig edqsConfig, TbServiceInfoProvider serviceInfoProvider,
TbKafkaConsumerStatsService consumerStatsService, TopicService topicService) { TbKafkaConsumerStatsService consumerStatsService, TopicService topicService,
StatsFactory statsFactory) {
this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsEventsConfigs()); this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsEventsConfigs());
this.edqsRequestsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsRequestsConfigs()); this.edqsRequestsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsRequestsConfigs());
this.edqsStateAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsStateConfigs()); this.edqsStateAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsStateConfigs());
@ -63,6 +66,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.consumerStatsService = consumerStatsService; this.consumerStatsService = consumerStatsService;
this.topicService = topicService; this.topicService = topicService;
this.statsFactory = statsFactory;
} }
@Override @Override
@ -117,7 +121,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
.maxPendingRequests(edqsConfig.getMaxPendingRequests()) .maxPendingRequests(edqsConfig.getMaxPendingRequests())
.requestTimeout(edqsConfig.getMaxRequestTimeout()) .requestTimeout(edqsConfig.getMaxRequestTimeout())
.pollInterval(edqsConfig.getPollInterval()) .pollInterval(edqsConfig.getPollInterval())
.stats(new DummyMessagesStats()) // FIXME .stats(statsFactory.createMessagesStats(StatsType.EDQS.getName()))
.executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs")) .executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs"))
.build(); .build();
} }

View File

@ -38,7 +38,7 @@ public class DefaultStatsFactory implements StatsFactory {
private static final Counter STUB_COUNTER = new StubCounter(); private static final Counter STUB_COUNTER = new StubCounter();
@Autowired // FIXME Slavik !!! @Autowired
private MeterRegistry meterRegistry; private MeterRegistry meterRegistry;
@Value("${metrics.enabled:false}") @Value("${metrics.enabled:false}")