diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index 1add0dae37..6833381f2d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -33,8 +33,8 @@ import static org.awaitility.Awaitility.await; @DaoSqlTest @TestPropertySource(properties = { -// "queue.type=kafka", // uncomment to use Kafka -// "queue.kafka.bootstrap.servers=10.7.1.254:9092", + "queue.type=kafka", // uncomment to use Kafka + "queue.kafka.bootstrap.servers=192.168.0.105:9092", "queue.edqs.sync.enabled=true", "queue.edqs.api_enabled=true", "queue.edqs.mode=local" diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index fa1309f932..968e637a3d 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -92,12 +92,11 @@ public class EdqsProcessor implements TbQueueHandler, @Autowired @Lazy private EdqsStateService stateService; - @Getter - private PartitionedQueueConsumerManager> eventsConsumer; + private PartitionedQueueConsumerManager> eventConsumer; private TbQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; private ExecutorService consumersExecutor; - private ExecutorService mgmtExecutor; + private ExecutorService taskExecutor; private ScheduledExecutorService scheduler; private ListeningExecutorService requestExecutor; private ExecutorService repartitionExecutor; @@ -112,7 +111,7 @@ public class EdqsProcessor implements TbQueueHandler, @PostConstruct private void init() { consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer")); - mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-mgmt"); + taskExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-task-executor"); scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-scheduler"); requestExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(12, "edqs-requests")); repartitionExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edqs-repartition")); @@ -125,7 +124,7 @@ public class EdqsProcessor implements TbQueueHandler, } }; - eventsConsumer = PartitionedQueueConsumerManager.>create() + eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic())) .topic(EdqsQueue.EVENTS.getTopic()) .pollInterval(config.getPollInterval()) @@ -146,10 +145,12 @@ public class EdqsProcessor implements TbQueueHandler, }) .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS)) .consumerExecutor(consumersExecutor) - .taskExecutor(mgmtExecutor) + .taskExecutor(taskExecutor) .scheduler(scheduler) .uncaughtErrorHandler(errorHandler) .build(); + stateService.init(eventConsumer); + responseTemplate = queueFactory.createEdqsResponseTemplate(); } @@ -171,7 +172,7 @@ public class EdqsProcessor implements TbQueueHandler, stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic())); // eventsConsumer's partitions are updated by stateService - responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); + responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); // FIXME: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); if (CollectionsUtil.isNotEmpty(oldPartitions)) { @@ -280,12 +281,13 @@ public class EdqsProcessor implements TbQueueHandler, @PreDestroy public void destroy() throws InterruptedException { - eventsConsumer.stop(); - eventsConsumer.awaitStop(); + eventConsumer.stop(); + eventConsumer.awaitStop(); responseTemplate.stop(); + stateService.stop(); consumersExecutor.shutdownNow(); - mgmtExecutor.shutdownNow(); + taskExecutor.shutdownNow(); scheduler.shutdownNow(); requestExecutor.shutdownNow(); repartitionExecutor.shutdownNow(); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsController.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsController.java deleted file mode 100644 index 06d5f89aa9..0000000000 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsController.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.edqs.state; - -import lombok.RequiredArgsConstructor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@RestController -@RequiredArgsConstructor -@ConditionalOnExpression("'${service.type:null}'=='edqs'") -@RequestMapping("/api/edqs") -public class EdqsController { - - private final EdqsStateService edqsStateService; - - @GetMapping("/ready") - public ResponseEntity isReady() { - if (edqsStateService.isReady()) { - return ResponseEntity.ok().build(); - } else { - return ResponseEntity.badRequest().build(); - } - } - -} diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java index d45cc0de14..1966618d4b 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java @@ -20,15 +20,19 @@ import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import java.util.Set; public interface EdqsStateService { + void init(PartitionedQueueConsumerManager> eventConsumer); + void process(Set partitions); void save(TenantId tenantId, ObjectType type, String key, EdqsEventType eventType, ToEdqsMsg msg); - boolean isReady(); + void stop(); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index fc7f8d40d9..f7382c9f15 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -15,13 +15,9 @@ */ package org.thingsboard.server.edqs.state; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.id.TenantId; @@ -44,16 +40,13 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; @Service @RequiredArgsConstructor @KafkaEdqsComponent @Slf4j -public class KafkaEdqsStateService extends QueueStateService, TbProtoQueueMsg> implements EdqsStateService { +public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; @@ -61,25 +54,19 @@ public class KafkaEdqsStateService extends QueueStateService> stateConsumer; + private QueueStateService, TbProtoQueueMsg> queueStateService; private QueueConsumerManager> eventsToBackupConsumer; private EdqsProducer stateProducer; - private ExecutorService consumersExecutor; - private ExecutorService mgmtExecutor; - private ScheduledExecutorService scheduler; - private final VersionsStore versionsStore = new VersionsStore(); private final AtomicInteger stateReadCount = new AtomicInteger(); private final AtomicInteger eventsReadCount = new AtomicInteger(); - @PostConstruct - private void init() { - consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer")); - mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-backup-consumer-mgmt"); - scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-backup-scheduler"); - - stateConsumer = PartitionedQueueConsumerManager.>create() // FIXME Slavik: if topic is empty + @Override + public void init(PartitionedQueueConsumerManager> eventConsumer) { + stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic())) + .topic(EdqsQueue.STATE.getTopic()) .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { @@ -100,12 +87,13 @@ public class KafkaEdqsStateService extends QueueStateService queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE)) - .consumerExecutor(consumersExecutor) - .taskExecutor(mgmtExecutor) - .scheduler(scheduler) + .consumerExecutor(eventConsumer.getConsumerExecutor()) + .taskExecutor(eventConsumer.getTaskExecutor()) + .scheduler(eventConsumer.getScheduler()) .uncaughtErrorHandler(edqsProcessor.getErrorHandler()) .build(); - super.init(stateConsumer, edqsProcessor.getEventsConsumer()); + queueStateService = new QueueStateService<>(); + queueStateService.init(stateConsumer, eventConsumer); eventsToBackupConsumer = QueueConsumerManager.>builder() .name("edqs-events-to-backup-consumer") @@ -145,7 +133,7 @@ public class KafkaEdqsStateService extends QueueStateService queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group - .consumerExecutor(consumersExecutor) + .consumerExecutor(eventConsumer.getConsumerExecutor()) .threadPrefix("edqs-events-to-backup") .build(); @@ -158,11 +146,11 @@ public class KafkaEdqsStateService extends QueueStateService partitions) { - if (getPartitions() == null) { + if (queueStateService.getPartitions() == null) { eventsToBackupConsumer.subscribe(); eventsToBackupConsumer.launch(); } - super.update(partitions); + queueStateService.update(partitions); } @Override @@ -170,25 +158,16 @@ public class KafkaEdqsStateService extends QueueStateService> eventConsumer; private Set partitions; + @Override + public void init(PartitionedQueueConsumerManager> eventConsumer) { + this.eventConsumer = eventConsumer; + } + @Override public void process(Set partitions) { - if (this.partitions != null) { + if (this.partitions == null) { db.forEach((key, value) -> { try { ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value); @@ -54,7 +62,7 @@ public class LocalEdqsStateService implements EdqsStateService { } }); } - processor.getEventsConsumer().update(partitions); + eventConsumer.update(partitions); this.partitions = partitions; } @@ -73,8 +81,7 @@ public class LocalEdqsStateService implements EdqsStateService { } @Override - public boolean isReady() { - return partitions != null; + public void stop() { } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java index 552bdf50d6..3bb489f209 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java @@ -58,7 +58,7 @@ public class TopicPartitionInfo { } public TopicPartitionInfo newByTopic(String topic) { - return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.myPartition); + return new TopicPartitionInfo(topic, this.tenantId, this.partition, this.useInternalPartition, this.myPartition); } public String getTopic() { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index 7bd8076ce9..df683b94cd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -54,8 +54,11 @@ public class MainQueueConsumerManager msgPackProcessor; protected final BiFunction> consumerCreator; + @Getter protected final ExecutorService consumerExecutor; + @Getter protected final ScheduledExecutorService scheduler; + @Getter protected final ExecutorService taskExecutor; protected final Consumer uncaughtErrorHandler; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java index 57f0950cdb..1b47d4c073 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -49,11 +49,6 @@ public class PartitionedQueueConsumerManager extends MainQ this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper; } - @Override - public void update(Set partitions) { - throw new UnsupportedOperationException("Use manual addPartitions and removePartitions"); - } - @Override protected void processTask(TbQueueConsumerManagerTask task) { if (task instanceof AddPartitionsTask addPartitionsTask) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java index bffe441f7c..d022ad14de 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java @@ -41,6 +41,7 @@ public class QueueStateService { } public void update(Set newPartitions) { + newPartitions = withTopic(newPartitions, stateConsumer.getTopic()); lock.lock(); Set oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet(); Set addedPartitions; @@ -54,9 +55,10 @@ public class QueueStateService { } finally { lock.unlock(); } + if (!removedPartitions.isEmpty()) { stateConsumer.removePartitions(removedPartitions); - eventConsumer.removePartitions(removedPartitions.stream().map(tpi -> tpi.withTopic(eventConsumer.getTopic())).collect(Collectors.toSet())); + eventConsumer.removePartitions(withTopic(removedPartitions, eventConsumer.getTopic())); } if (!addedPartitions.isEmpty()) { @@ -73,4 +75,8 @@ public class QueueStateService { } } + private Set withTopic(Set partitions, String topic) { + return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet()); + } + }