Refactoring for EDQS

This commit is contained in:
ViacheslavKlimov 2025-03-21 14:25:54 +02:00
parent b7604a8d0a
commit 0caa6ad86e
9 changed files with 19 additions and 35 deletions

View File

@ -96,10 +96,8 @@ public class DefaultEdqsService implements EdqsService {
private void init() {
executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass());
eventsProducer = EdqsProducer.builder()
.queue(EdqsQueue.EVENTS)
.partitionService(edqsPartitionService)
.topicService(topicService)
.producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS))
.partitionService(edqsPartitionService)
.build();
syncLock = distributedLockService.getLock("edqs_sync");
}

View File

@ -43,7 +43,6 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.sql.relation.RelationRepository;
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import java.util.List;
import java.util.Map;
@ -76,8 +75,6 @@ public abstract class EdqsSyncService {
@Autowired
@Lazy
private DefaultEdqsService edqsService;
@Autowired
protected EdqsConfig edqsConfig;
private final ConcurrentHashMap<UUID, EntityIdInfo> entityInfoMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, String> keys = new ConcurrentHashMap<>();

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edqs;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
@ -32,7 +33,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
private final boolean syncNeeded;
public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) {
public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, EdqsConfig edqsConfig) {
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
.mapToObj(partition -> TopicPartitionInfo.builder()

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.edqs.processor;
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.thingsboard.server.common.data.ObjectType;
@ -27,53 +28,38 @@ import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
@Slf4j
@Builder
@RequiredArgsConstructor
public class EdqsProducer {
private final EdqsQueue queue;
private final EdqsPartitionService partitionService;
private final TopicService topicService;
private final TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> producer;
@Builder
public EdqsProducer(EdqsQueue queue,
EdqsPartitionService partitionService,
TopicService topicService,
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> producer) {
this.queue = queue;
this.partitionService = partitionService;
this.topicService = topicService;
this.producer = producer;
}
private final EdqsPartitionService partitionService;
public void send(TenantId tenantId, ObjectType type, String key, ToEdqsMsg msg) {
String topic = topicService.buildTopicName(queue.getTopic());
TopicPartitionInfo tpi = TopicPartitionInfo.builder()
.topic(producer.getDefaultTopic())
.partition(partitionService.resolvePartition(tenantId, key))
.build();
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, topic, msg);
log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, tpi, msg);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof RecordTooLargeException) {
if (!log.isDebugEnabled()) {
log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, topic, t); // not logging the whole message
log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, tpi, t); // not logging the whole message
return;
}
}
log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t);
log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, tpi, msg, t);
}
};
TopicPartitionInfo tpi = TopicPartitionInfo.builder()
.topic(topic)
.partition(partitionService.resolvePartition(tenantId, key))
.build();
if (producer instanceof TbKafkaProducerTemplate<TbProtoQueueMsg<ToEdqsMsg>> kafkaProducer) {
kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction
} else {

View File

@ -143,10 +143,8 @@ public class KafkaEdqsStateService implements EdqsStateService {
.build();
stateProducer = EdqsProducer.builder()
.queue(EdqsQueue.STATE)
.partitionService(partitionService)
.topicService(topicService)
.producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE))
.partitionService(partitionService)
.build();
}

View File

@ -95,6 +95,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.settings(kafkaSettings)
.admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin)
.build();

View File

@ -593,6 +593,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

View File

@ -483,6 +483,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

View File

@ -378,6 +378,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();