diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 7e9d60d09a..fa1309f932 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -43,14 +43,13 @@ import org.thingsboard.server.common.data.edqs.query.QueryResult; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.edqs.repo.EdqRepository; +import org.thingsboard.server.edqs.state.EdqsPartitionService; import org.thingsboard.server.edqs.state.EdqsStateService; import org.thingsboard.server.edqs.util.EdqsConverter; -import org.thingsboard.server.edqs.state.EdqsPartitionService; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; @@ -59,7 +58,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.edqs.EdqsComponent; @@ -93,7 +92,8 @@ public class EdqsProcessor implements TbQueueHandler, @Autowired @Lazy private EdqsStateService stateService; - private MainQueueConsumerManager, QueueConfig> eventsConsumer; + @Getter + private PartitionedQueueConsumerManager> eventsConsumer; private TbQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; private ExecutorService consumersExecutor; @@ -125,11 +125,15 @@ public class EdqsProcessor implements TbQueueHandler, } }; - eventsConsumer = MainQueueConsumerManager., QueueConfig>builder() + eventsConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic())) - .config(QueueConfig.of(true, config.getPollInterval())) + .topic(EdqsQueue.EVENTS.getTopic()) + .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { + if (consumer.isStopped()) { + return; + } try { ToEdqsMsg msg = queueMsg.getValue(); log.trace("Processing message: {}", msg); @@ -159,37 +163,31 @@ public class EdqsProcessor implements TbQueueHandler, if (event.getServiceType() != ServiceType.EDQS) { return; } - repartitionExecutor.submit(() -> { // todo: maybe cancel the task if new event comes - try { - Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - Set partitions = newPartitions.stream() - .map(tpi -> tpi.withUseInternalPartition(true)) - .collect(Collectors.toSet()); + try { + Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); + Set partitions = newPartitions.stream() + .map(tpi -> tpi.withUseInternalPartition(true)) + .collect(Collectors.toSet()); - try { - stateService.restore(withTopic(partitions, EdqsQueue.STATE.getTopic())); // blocks until restored - } catch (Exception e) { - log.error("Failed to process restore for partitions {}", partitions, e); - } - eventsConsumer.update(withTopic(partitions, EdqsQueue.EVENTS.getTopic())); - responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); + stateService.process(withTopic(partitions, EdqsQueue.STATE.getTopic())); + // eventsConsumer's partitions are updated by stateService + responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); - Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); - if (CollectionsUtil.isNotEmpty(oldPartitions)) { - Set removedPartitions = Sets.difference(oldPartitions, newPartitions).stream() - .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet()); - if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) { - log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions); - } - repository.clearIf(tenantId -> { - Integer partition = partitionService.resolvePartition(tenantId); - return partition != null && removedPartitions.contains(partition); - }); + Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); + if (CollectionsUtil.isNotEmpty(oldPartitions)) { + Set removedPartitions = Sets.difference(oldPartitions, newPartitions).stream() + .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet()); + if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) { + log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions); } - } catch (Throwable t) { - log.error("Failed to handle partition change event {}", event, t); + repository.clearIf(tenantId -> { + Integer partition = partitionService.resolvePartition(tenantId); + return partition != null && removedPartitions.contains(partition); + }); } - }); + } catch (Throwable t) { + log.error("Failed to handle partition change event {}", event, t); + } } @Override diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java index 06d080ac64..d45cc0de14 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java @@ -25,7 +25,7 @@ import java.util.Set; public interface EdqsStateService { - void restore(Set partitions); + void process(Set partitions); void save(TenantId tenantId, ObjectType type, String key, EdqsEventType eventType, ToEdqsMsg msg); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 6ab1fb9e1f..fc7f8d40d9 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -25,7 +25,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.edqs.processor.EdqsProcessor; @@ -34,8 +33,9 @@ import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; +import org.thingsboard.server.queue.common.consumer.QueueStateService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsQueue; @@ -44,7 +44,6 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -54,19 +53,17 @@ import java.util.concurrent.atomic.AtomicInteger; @RequiredArgsConstructor @KafkaEdqsComponent @Slf4j -public class KafkaEdqsStateService implements EdqsStateService { +public class KafkaEdqsStateService extends QueueStateService, TbProtoQueueMsg> implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; private final EdqsQueueFactory queueFactory; private final EdqsProcessor edqsProcessor; - private MainQueueConsumerManager, QueueConfig> stateConsumer; - private QueueConsumerManager> eventsConsumer; + private PartitionedQueueConsumerManager> stateConsumer; + private QueueConsumerManager> eventsToBackupConsumer; private EdqsProducer stateProducer; - private boolean initialRestoreDone; - private ExecutorService consumersExecutor; private ExecutorService mgmtExecutor; private ScheduledExecutorService scheduler; @@ -81,11 +78,14 @@ public class KafkaEdqsStateService implements EdqsStateService { mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-backup-consumer-mgmt"); scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-backup-scheduler"); - stateConsumer = MainQueueConsumerManager., QueueConfig>builder() // FIXME Slavik: if topic is empty + stateConsumer = PartitionedQueueConsumerManager.>create() // FIXME Slavik: if topic is empty .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic())) - .config(QueueConfig.of(true, config.getPollInterval())) + .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { + if (consumer.isStopped()) { + return; + } try { ToEdqsMsg msg = queueMsg.getValue(); log.trace("Processing message: {}", msg); @@ -94,7 +94,7 @@ public class KafkaEdqsStateService implements EdqsStateService { log.info("[state] Processed {} msgs", stateReadCount.get()); } } catch (Exception e) { - log.error("Failed to process message: {}", queueMsg, e); // TODO: do something about the error - e.g. reprocess + log.error("Failed to process message: {}", queueMsg, e); } } consumer.commit(); @@ -105,46 +105,43 @@ public class KafkaEdqsStateService implements EdqsStateService { .scheduler(scheduler) .uncaughtErrorHandler(edqsProcessor.getErrorHandler()) .build(); + super.init(stateConsumer, edqsProcessor.getEventsConsumer()); - ExecutorService backupExecutor = ThingsBoardExecutors.newLimitedTasksExecutor(12, 1000, "events-to-backup-executor"); - eventsConsumer = QueueConsumerManager.>builder() // FIXME Slavik writes to the state while we read it, slows down the start. maybe start backup consumer after restore is finished + eventsToBackupConsumer = QueueConsumerManager.>builder() .name("edqs-events-to-backup-consumer") .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer) -> { - CountDownLatch resultLatch = new CountDownLatch(msgs.size()); for (TbProtoQueueMsg queueMsg : msgs) { - backupExecutor.submit(() -> { - try { - ToEdqsMsg msg = queueMsg.getValue(); - log.trace("Processing message: {}", msg); + if (consumer.isStopped()) { + return; + } + try { + ToEdqsMsg msg = queueMsg.getValue(); + log.trace("Processing message: {}", msg); - if (msg.hasEventMsg()) { - EdqsEventMsg eventMsg = msg.getEventMsg(); - String key = eventMsg.getKey(); - int count = eventsReadCount.incrementAndGet(); - if (count % 100000 == 0) { - log.info("[events-to-backup] Processed {} msgs", count); - } - if (eventMsg.hasVersion()) { - if (!versionsStore.isNew(key, eventMsg.getVersion())) { - return; - } - } - - TenantId tenantId = getTenantId(msg); - ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); - EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); - log.debug("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); - stateProducer.send(tenantId, objectType, key, msg); + if (msg.hasEventMsg()) { + EdqsEventMsg eventMsg = msg.getEventMsg(); + String key = eventMsg.getKey(); + int count = eventsReadCount.incrementAndGet(); + if (count % 100000 == 0) { + log.info("[events-to-backup] Processed {} msgs", count); } - } catch (Throwable t) { - log.error("Failed to process message: {}", queueMsg, t); - } finally { - resultLatch.countDown(); + if (eventMsg.hasVersion()) { + if (!versionsStore.isNew(key, eventMsg.getVersion())) { + continue; + } + } + + TenantId tenantId = getTenantId(msg); + ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); + EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); + log.debug("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); + stateProducer.send(tenantId, objectType, key, msg); } - }); + } catch (Throwable t) { + log.error("Failed to process message: {}", queueMsg, t); + } } - resultLatch.await(); consumer.commit(); }) .consumerCreator(() -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group @@ -160,20 +157,12 @@ public class KafkaEdqsStateService implements EdqsStateService { } @Override - public void restore(Set partitions) { - stateReadCount.set(0); //TODO Slavik: do not support remote mode in monolith setup - long startTs = System.currentTimeMillis(); - log.info("Restore started for partitions {}", partitions.stream().map(tpi -> tpi.getPartition().orElse(-1)).sorted().toList()); - stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update - stateConsumer.awaitStop(0); // consumers should stop on their own because EdqsQueue.STATE.stopWhenRead is true, we just need to wait - log.info("Restore finished in {} ms. Processed {} msgs", (System.currentTimeMillis() - startTs), stateReadCount.get()); - - if (!initialRestoreDone) { - initialRestoreDone = true; - - eventsConsumer.subscribe(); - eventsConsumer.launch(); + public void process(Set partitions) { + if (getPartitions() == null) { + eventsToBackupConsumer.subscribe(); + eventsToBackupConsumer.launch(); } + super.update(partitions); } @Override @@ -194,7 +183,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private void preDestroy() { stateConsumer.stop(); stateConsumer.awaitStop(); - eventsConsumer.stop(); + eventsToBackupConsumer.stop(); stateProducer.stop(); consumersExecutor.shutdownNow(); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index 6a61488182..32f4159f9f 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -17,8 +17,6 @@ package org.thingsboard.server.edqs.state; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; @@ -38,29 +36,26 @@ import java.util.Set; @Slf4j public class LocalEdqsStateService implements EdqsStateService { - @Autowired @Lazy - private EdqsProcessor processor; - @Autowired - private EdqsRocksDb db; + private final EdqsProcessor processor; + private final EdqsRocksDb db; - private boolean restoreDone; + private Set partitions; @Override - public void restore(Set partitions) { - if (restoreDone) { - return; + public void process(Set partitions) { + if (this.partitions != null) { + db.forEach((key, value) -> { + try { + ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value); + log.trace("[{}] Restored msg from RocksDB: {}", key, edqsMsg); + processor.process(edqsMsg, EdqsQueue.STATE); + } catch (Exception e) { + log.error("[{}] Failed to restore value", key, e); + } + }); } - - db.forEach((key, value) -> { - try { - ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value); - log.trace("[{}] Restored msg from RocksDB: {}", key, edqsMsg); - processor.process(edqsMsg, EdqsQueue.STATE); - } catch (Exception e) { - log.error("[{}] Failed to restore value", key, e); - } - }); - restoreDone = true; + processor.getEventsConsumer().update(partitions); + this.partitions = partitions; } @Override @@ -79,7 +74,7 @@ public class LocalEdqsStateService implements EdqsStateService { @Override public boolean isReady() { - return restoreDone; + return partitions != null; } }