From 2b3af165301c2098f3b72012a5429300f43f8ed8 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 19 Feb 2025 17:27:12 +0200 Subject: [PATCH] CF states restore improvements --- .../AbstractCalculatedFieldStateService.java | 10 +++ .../cf/CalculatedFieldStateService.java | 7 +++ .../KafkaCalculatedFieldStateService.java | 62 ++++++------------- .../RocksDBCalculatedFieldStateService.java | 23 ++++--- ...faultTbCalculatedFieldConsumerService.java | 52 ++++++---------- .../src/main/resources/thingsboard.yml | 2 - .../consumer/MainQueueConsumerManager.java | 3 + .../PartitionedQueueConsumerManager.java | 5 -- .../common/consumer/QueueStateService.java | 8 ++- 9 files changed, 79 insertions(+), 93 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index c55ec00379..2d24f4f44d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -21,6 +21,9 @@ import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestore import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.exception.CalculatedFieldStateException; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @@ -32,6 +35,13 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF @Autowired private ActorSystemContext actorSystemContext; + protected PartitionedQueueConsumerManager> eventConsumer; + + @Override + public void init(PartitionedQueueConsumerManager> eventConsumer) { + this.eventConsumer = eventConsumer; + } + @Override public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { if (state.isStateTooLarge()) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java index df2b04c20a..18d5518da3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java @@ -18,6 +18,9 @@ package org.thingsboard.server.service.cf; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.exception.CalculatedFieldStateException; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @@ -25,10 +28,14 @@ import java.util.Set; public interface CalculatedFieldStateService { + void init(PartitionedQueueConsumerManager> eventConsumer); + void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) throws CalculatedFieldStateException; void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback); void restore(Set partitions); + void stop(); + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index c05517acba..a9cee454f2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -15,15 +15,11 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -31,25 +27,22 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.common.consumer.QueueStateService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.queue.DefaultTbCalculatedFieldConsumerService.CalculatedFieldQueueConfig; -import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString; import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToUuid; @@ -67,27 +60,20 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Value("${queue.calculated_fields.poll_interval:25}") private long pollInterval; - @Value("${queue.calculated_fields.consumer_per_partition:true}") - private boolean consumerPerPartition; - private MainQueueConsumerManager, CalculatedFieldQueueConfig> stateConsumer; + private PartitionedQueueConsumerManager> stateConsumer; private TbKafkaProducerTemplate> stateProducer; - - protected ExecutorService consumersExecutor; - protected ExecutorService mgmtExecutor; - protected ScheduledExecutorService scheduler; + private QueueStateService, TbProtoQueueMsg> queueStateService; private final AtomicInteger counter = new AtomicInteger(); - @PostConstruct - private void init() { - this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cf-state-consumer")); - this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(Math.max(Runtime.getRuntime().availableProcessors(), 4), "cf-state-mgmt"); - this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("cf-state-consumer-scheduler"); - - this.stateConsumer = MainQueueConsumerManager., CalculatedFieldQueueConfig>builder() + @Override + public void init(PartitionedQueueConsumerManager> eventConsumer) { + super.init(eventConsumer); + this.stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(QueueKey.CF_STATES) - .config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval)) + .topic(partitionService.getTopic(QueueKey.CF_STATES)) + .pollInterval(pollInterval) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg msg : msgs) { try { @@ -107,11 +93,13 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta } }) .consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer()) - .consumerExecutor(consumersExecutor) - .scheduler(scheduler) - .taskExecutor(mgmtExecutor) + .consumerExecutor(eventConsumer.getConsumerExecutor()) + .scheduler(eventConsumer.getScheduler()) + .taskExecutor(eventConsumer.getTaskExecutor()) .build(); this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); + this.queueStateService = new QueueStateService<>(); + this.queueStateService.init(stateConsumer, super.eventConsumer); } @Override @@ -145,15 +133,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override public void restore(Set partitions) { - partitions = partitions.stream().map(tpi -> tpi.newByTopic(partitionService.getTopic(QueueKey.CF_STATES))).collect(Collectors.toSet()); - log.info("Restoring calculated field states for partitions: {}", partitions.stream().map(TopicPartitionInfo::getFullTopicName).toList()); - long startTs = System.currentTimeMillis(); - counter.set(0); - - stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update - stateConsumer.awaitStop(0);// consumers should stop on their own because stopWhenRead is true, we just need to wait - - log.info("Restored {} calculated field states in {} ms", counter.get(), System.currentTimeMillis() - startTs); + queueStateService.update(partitions); } private void putStateId(TbQueueMsgHeaders headers, CalculatedFieldEntityCtxId stateId) { @@ -170,15 +150,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId); } - @PreDestroy - private void preDestroy() { + @Override + public void stop() { stateConsumer.stop(); stateConsumer.awaitStop(); stateProducer.stop(); - - consumersExecutor.shutdownNow(); - mgmtExecutor.shutdownNow(); - scheduler.shutdownNow(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 61e752b9d1..f5f2051fb4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -54,18 +54,21 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS @Override public void restore(Set partitions) { if (this.partitions == null) { - this.partitions = partitions; - } else { - return; + cfRocksDb.forEach((key, value) -> { + try { + processRestoredState(CalculatedFieldStateProto.parseFrom(value)); + } catch (InvalidProtocolBufferException e) { + log.error("[{}] Failed to process restored state", key, e); + } + }); } - cfRocksDb.forEach((key, value) -> { - try { - processRestoredState(CalculatedFieldStateProto.parseFrom(value)); - } catch (InvalidProtocolBufferException e) { - log.error("[{}] Failed to process restored state", key, e); - } - }); + eventConsumer.update(partitions); + this.partitions = partitions; + } + + @Override + public void stop() { } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 5e741c5064..d579d1e538 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -19,13 +19,11 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; @@ -45,6 +43,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; @@ -55,7 +54,6 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; -import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; @@ -66,8 +64,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -83,18 +79,15 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer private long pollInterval; @Value("${queue.calculated_fields.pack_processing_timeout:60000}") private long packProcessingTimeout; - @Value("${queue.calculated_fields.consumer_per_partition:true}") - private boolean consumerPerPartition; @Value("${queue.calculated_fields.pool_size:8}") private int poolSize; private final TbRuleEngineQueueFactory queueFactory; private final CalculatedFieldStateService stateService; - private MainQueueConsumerManager, CalculatedFieldQueueConfig> mainConsumer; + private PartitionedQueueConsumerManager> eventConsumer; private ListeningExecutorService calculatedFieldsExecutor; - private ExecutorService repartitionExecutor; public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, ActorSystemContext actorContext, @@ -117,17 +110,18 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer public void init() { super.init("tb-cf"); this.calculatedFieldsExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(poolSize, "tb-cf-executor")); // TODO: multiple threads. - this.repartitionExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-cf-repartition")); - this.mainConsumer = MainQueueConsumerManager., CalculatedFieldQueueConfig>builder() + this.eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(QueueKey.CF) - .config(CalculatedFieldQueueConfig.of(consumerPerPartition, (int) pollInterval)) + .topic(partitionService.getTopic(QueueKey.CF)) + .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) .taskExecutor(mgmtExecutor) .build(); + stateService.init(eventConsumer); } @PreDestroy @@ -146,17 +140,16 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { var partitions = event.getCfPartitions(); - repartitionExecutor.submit(() -> { - try { - stateService.restore(partitions); - mainConsumer.update(partitions); - // Cleanup old entities after corresponding consumers are stopped. - // Any periodic tasks need to check that the entity is still managed by the current server before processing. - actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); - } catch (Throwable t) { - log.error("Failed to process partition change event: {}", event, t); - } - }); + try { + stateService.restore(partitions); + // eventConsumer's partitions will be updated by stateService + + // Cleanup old entities after corresponding consumers are stopped. + // Any periodic tasks need to check that the entity is still managed by the current server before processing. + actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); + } catch (Throwable t) { + log.error("Failed to process partition change event: {}", event, t); + } } private boolean[] partitionsToBooleanIndexArray(Set partitions) { @@ -167,7 +160,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer return myPartitions; } - private void processMsgs(List> msgs, TbQueueConsumer> consumer, CalculatedFieldQueueConfig config) throws Exception { + private void processMsgs(List> msgs, TbQueueConsumer> consumer, QueueConfig config) throws Exception { List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); ConcurrentMap> pendingMap = orderedMsgList.stream().collect( Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg)); @@ -275,14 +268,9 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected void stopConsumers() { super.stopConsumers(); - mainConsumer.stop(); - mainConsumer.awaitStop(); - } - - @Data(staticConstructor = "of") - public static class CalculatedFieldQueueConfig implements QueueConfig { - private final boolean consumerPerPartition; - private final int pollInterval; + eventConsumer.stop(); + eventConsumer.awaitStop(); + stateService.stop(); } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6cbca3844e..46950dc744 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1757,8 +1757,6 @@ queue: partitions: "${TB_QUEUE_CF_PARTITIONS:10}" # Timeout for processing a message pack by CF microservices pack_processing_timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:60000}" - # Enable/disable a separate consumer per partition for CF queue - consumer_per_partition: "${TB_QUEUE_CF_CONSUMER_PER_PARTITION:true}" # Thread pool size for processing of the incoming messages pool_size: "${TB_QUEUE_CF_POOL_SIZE:8}" # RocksDB path for storing CF states diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index bf04491050..f9ade10b1d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -53,8 +53,11 @@ public class MainQueueConsumerManager msgPackProcessor; protected final BiFunction> consumerCreator; + @Getter protected final ExecutorService consumerExecutor; + @Getter protected final ScheduledExecutorService scheduler; + @Getter protected final ExecutorService taskExecutor; private final java.util.Queue tasks = new ConcurrentLinkedQueue<>(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java index 1d51563bee..f8ba1671ae 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -49,11 +49,6 @@ public class PartitionedQueueConsumerManager extends MainQ this.consumerWrapper = (ConsumerPerPartitionWrapper) super.consumerWrapper; } - @Override - public void update(Set partitions) { - throw new UnsupportedOperationException("Use manual addPartitions and removePartitions"); - } - @Override protected void processTask(TbQueueConsumerManagerTask task) { if (task instanceof AddPartitionsTask addPartitionsTask) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java index 3c444b8495..0931b9ed5b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java @@ -41,6 +41,7 @@ public class QueueStateService { } public void update(Set newPartitions) { + newPartitions = withTopic(newPartitions, stateConsumer.getTopic()); lock.lock(); Set oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet(); Set addedPartitions; @@ -54,9 +55,10 @@ public class QueueStateService { } finally { lock.unlock(); } + if (!removedPartitions.isEmpty()) { stateConsumer.removePartitions(removedPartitions); - eventConsumer.removePartitions(removedPartitions.stream().map(tpi -> tpi.newByTopic(eventConsumer.getTopic())).collect(Collectors.toSet())); + eventConsumer.removePartitions(withTopic(removedPartitions, eventConsumer.getTopic())); } if (!addedPartitions.isEmpty()) { @@ -73,4 +75,8 @@ public class QueueStateService { } } + private Set withTopic(Set partitions, String topic) { + return partitions.stream().map(tpi -> tpi.newByTopic(topic)).collect(Collectors.toSet()); + } + }