diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index e823dee4e7..7d5a0cb0fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -96,10 +96,8 @@ public class DefaultEdqsService implements EdqsService { private void init() { executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass()); eventsProducer = EdqsProducer.builder() - .queue(EdqsQueue.EVENTS) - .partitionService(edqsPartitionService) - .topicService(topicService) .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS)) + .partitionService(edqsPartitionService) .build(); syncLock = distributedLockService.getLock("edqs_sync"); } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java index 6ea2f959b7..79e0e60983 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java @@ -43,7 +43,6 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sql.relation.RelationRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; -import org.thingsboard.server.queue.edqs.EdqsConfig; import java.util.List; import java.util.Map; @@ -76,8 +75,6 @@ public abstract class EdqsSyncService { @Autowired @Lazy private DefaultEdqsService edqsService; - @Autowired - protected EdqsConfig edqsConfig; private final ConcurrentHashMap entityInfoMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap keys = new ConcurrentHashMap<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 201964c955..ad7b7b970d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edqs; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -32,7 +33,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService { private final boolean syncNeeded; - public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) { + public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, EdqsConfig edqsConfig) { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java index a06836339f..cc4f913d38 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProducer.java @@ -16,6 +16,7 @@ package org.thingsboard.server.edqs.processor; import lombok.Builder; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.RecordTooLargeException; import org.thingsboard.server.common.data.ObjectType; @@ -27,53 +28,38 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; @Slf4j +@Builder +@RequiredArgsConstructor public class EdqsProducer { - private final EdqsQueue queue; - private final EdqsPartitionService partitionService; - private final TopicService topicService; - private final TbQueueProducer> producer; - - @Builder - public EdqsProducer(EdqsQueue queue, - EdqsPartitionService partitionService, - TopicService topicService, - TbQueueProducer> producer) { - this.queue = queue; - this.partitionService = partitionService; - this.topicService = topicService; - this.producer = producer; - } + private final EdqsPartitionService partitionService; public void send(TenantId tenantId, ObjectType type, String key, ToEdqsMsg msg) { - String topic = topicService.buildTopicName(queue.getTopic()); + TopicPartitionInfo tpi = TopicPartitionInfo.builder() + .topic(producer.getDefaultTopic()) + .partition(partitionService.resolvePartition(tenantId, key)) + .build(); TbQueueCallback callback = new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, topic, msg); + log.trace("[{}][{}][{}] Published msg to {}: {}", tenantId, type, key, tpi, msg); } @Override public void onFailure(Throwable t) { if (t instanceof RecordTooLargeException) { if (!log.isDebugEnabled()) { - log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, topic, t); // not logging the whole message + log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, tpi, t); // not logging the whole message return; } } - log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t); + log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, tpi, msg, t); } }; - TopicPartitionInfo tpi = TopicPartitionInfo.builder() - .topic(topic) - .partition(partitionService.resolvePartition(tenantId, key)) - .build(); if (producer instanceof TbKafkaProducerTemplate> kafkaProducer) { kafkaProducer.send(tpi, key, new TbProtoQueueMsg<>(null, msg), callback); // specifying custom key for compaction } else { diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 067c730bfe..c59707c9c3 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -143,10 +143,8 @@ public class KafkaEdqsStateService implements EdqsStateService { .build(); stateProducer = EdqsProducer.builder() - .queue(EdqsQueue.STATE) - .partitionService(partitionService) - .topicService(topicService) .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE)) + .partitionService(partitionService) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index e985696040..a322cc5434 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -95,6 +95,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index f07bd9dcbb..1a8ee1dcee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -593,6 +593,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 3c6d144a0c..2a1dc4171f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -483,6 +483,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index d0e6c2f123..f5300badbe 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -378,6 +378,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { return TbKafkaProducerTemplate.>builder() .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(queue.getTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build();