From fbcfe9281d715130525d6eeaf306918bf7735af7 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 28 Mar 2025 17:02:41 +0200 Subject: [PATCH 1/3] wip cf consumer groups --- .../AbstractCalculatedFieldStateService.java | 18 +++---- .../cf/CalculatedFieldStateService.java | 6 +-- .../KafkaCalculatedFieldStateService.java | 6 +-- .../RocksDBCalculatedFieldStateService.java | 8 ++-- ...faultTbCalculatedFieldConsumerService.java | 48 ++++++++++++++----- .../InMemoryMonolithQueueFactory.java | 2 +- .../provider/KafkaMonolithQueueFactory.java | 15 ++++-- .../KafkaTbRuleEngineQueueFactory.java | 14 ++++-- .../provider/TbRuleEngineQueueFactory.java | 2 +- 9 files changed, 80 insertions(+), 39 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index 91c08ab6e0..1347705a23 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -30,7 +30,9 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.Collection; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; @@ -41,7 +43,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF @Autowired private ActorSystemContext actorSystemContext; - protected QueueStateService, TbProtoQueueMsg> stateService; + protected Map, TbProtoQueueMsg>> stateServices = new ConcurrentHashMap<>(); @Override public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { @@ -72,22 +74,22 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF @Override public void restore(QueueKey queueKey, Set partitions) { - stateService.update(queueKey, partitions); + stateServices.get(queueKey).update(queueKey, partitions); } @Override - public void delete(Set partitions) { - stateService.delete(partitions); + public void delete(QueueKey queueKey, Set partitions) { + stateServices.get(queueKey).delete(partitions); } @Override - public Set getPartitions() { - return stateService.getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + public Set getPartitions(QueueKey queueKey) { + return stateServices.get(queueKey).getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); } @Override - public void stop() { - stateService.stop(); + public void stop(QueueKey queueKey) { + stateServices.get(queueKey).stop(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java index d0b34f18e8..a8b0ccf30e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java @@ -37,10 +37,10 @@ public interface CalculatedFieldStateService { void restore(QueueKey queueKey, Set partitions); - void delete(Set partitions); + void delete(QueueKey queueKey, Set partitions); - Set getPartitions(); + Set getPartitions(QueueKey queueKey); - void stop(); + void stop(QueueKey queueKey); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 90d4056afc..9d59cbce95 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -97,7 +97,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta .scheduler(eventConsumer.getScheduler()) .taskExecutor(eventConsumer.getTaskExecutor()) .build(); - super.stateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer); + super.stateServices.put(queueKey, new KafkaQueueStateService<>(eventConsumer, stateConsumer)); this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); } @@ -145,8 +145,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta } @Override - public void stop() { - super.stop(); + public void stop(QueueKey queueKey) { + super.stop(queueKey); stateProducer.stop(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 0eaa506dfd..889d8fcc01 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -20,13 +20,15 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.queue.common.state.DefaultQueueStateService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.common.state.DefaultQueueStateService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.CfRocksDb; @@ -44,7 +46,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - super.stateService = new DefaultQueueStateService<>(eventConsumer); + super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer)); } @Override @@ -61,7 +63,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS @Override public void restore(QueueKey queueKey, Set partitions) { - if (stateService.getPartitions().isEmpty()) { + if (stateServices.get(queueKey).getPartitions().isEmpty()) { cfRocksDb.forEach((key, value) -> { try { processRestoredState(CalculatedFieldStateProto.parseFrom(value)); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index bce1992932..7149b58c57 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -30,12 +30,14 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; @@ -79,7 +81,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar private long packProcessingTimeout; private final TbRuleEngineQueueFactory queueFactory; + private final QueueService queueService; + private final CalculatedFieldStateService stateService; + private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, ActorSystemContext actorContext, @@ -91,28 +96,40 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, + QueueService queueService, CalculatedFieldStateService stateService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; + this.queueService = queueService; this.stateService = stateService; } @Override protected void doAfterStartUp() { - var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); - PartitionedQueueConsumerManager> eventConsumer = PartitionedQueueConsumerManager.>create() + List queues = queueService.findAllQueues(); + for (Queue configuration : queues) { + if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { + stateService.init(createConsumer(configuration)); + } + } + } + + private PartitionedQueueConsumerManager> createConsumer(Queue queue) { + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, queue.getTenantId()); + var eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) - .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) + .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(queue, partitionId)) .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) .taskExecutor(mgmtExecutor) .build(); - stateService.init(eventConsumer); + consumers.put(queueKey, eventConsumer); + return eventConsumer; } @PreDestroy @@ -227,13 +244,20 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) { if (event.getEntityId().getEntityType() == EntityType.TENANT) { if (event.getEvent() == ComponentLifecycleEvent.DELETED) { - Set partitions = stateService.getPartitions(); - if (CollectionUtils.isEmpty(partitions)) { - return; - } - stateService.delete(partitions.stream() - .filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId())) - .collect(Collectors.toSet())); + consumers.keySet().removeIf(queueKey -> { + boolean toRemove = queueKey.getTenantId().equals(event.getTenantId()); + if (toRemove) { + Set partitions = stateService.getPartitions(queueKey); + if (!CollectionUtils.isEmpty(partitions)) { + stateService.delete(queueKey, partitions); + } + var consumer = consumers.remove(queueKey); + if (consumer != null) { + consumer.stop(); + } + } + return toRemove; + }); } } } @@ -258,7 +282,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar @Override protected void stopConsumers() { super.stopConsumers(); - stateService.stop(); // eventConsumer will be stopped by stateService + consumers.keySet().forEach(stateService::stop); // eventConsumer will be stopped by stateService } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index bd8b4bd4f8..8e724b9145 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -134,7 +134,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index c50344a23f..841a5b29d8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -104,7 +104,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin housekeeperReprocessingAdmin; private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; - private final TbQueueAdmin cfAdmin; + private final TbKafkaAdmin cfAdmin; private final TbQueueAdmin cfStateAdmin; private final TbQueueAdmin edqsEventsAdmin; private final TbKafkaAdmin edqsRequestsAdmin; @@ -514,15 +514,22 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { + String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId); + + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); - consumerBuilder.clientId("monolith-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-consumer")); + consumerBuilder.clientId("cf-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(cfAdmin); consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 2e342b0dd2..e0f7348d9f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -89,7 +89,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueAdmin housekeeperAdmin; private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; - private final TbQueueAdmin cfAdmin; + private final TbKafkaAdmin cfAdmin; private final TbQueueAdmin cfStateAdmin; private final TbQueueAdmin edqsEventsAdmin; private final AtomicLong consumerCount = new AtomicLong(); @@ -309,12 +309,18 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { + String queueName = configuration.getName(); + String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId); + + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); - consumerBuilder.clientId("tb-rule-engine-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-consumer")); + consumerBuilder.clientId("cf-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(cfAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 329e3e346a..4a4b36b6ae 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -121,7 +121,7 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate(); - TbQueueConsumer> createToCalculatedFieldMsgConsumer(); + TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId); TbQueueAdmin getCalculatedFieldQueueAdmin(); From f84e5da9df500084fb93c5b88bc2db6116eaf7f8 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 31 Mar 2025 16:56:34 +0300 Subject: [PATCH 2/3] consumer groups updates --- ...CalculatedFieldEntityMessageProcessor.java | 10 +++---- ...alculatedFieldManagerMessageProcessor.java | 8 +++--- .../KafkaCalculatedFieldStateService.java | 4 +-- .../RocksDBCalculatedFieldStateService.java | 2 +- ...faultTbCalculatedFieldConsumerService.java | 28 +++++++++---------- .../InMemoryMonolithQueueFactory.java | 3 +- .../provider/KafkaMonolithQueueFactory.java | 9 +++--- .../KafkaTbRuleEngineQueueFactory.java | 10 ++++--- .../provider/TbRuleEngineQueueFactory.java | 3 +- 9 files changed, 41 insertions(+), 36 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index db65248a8f..5ea79e42a8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -107,7 +107,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM public void process(CalculatedFieldStateRestoreMsg msg) { CalculatedFieldId cfId = msg.getId().cfId(); - log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); + log.debug("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); if (msg.getState() != null) { states.put(cfId, msg.getState()); } else { @@ -116,10 +116,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException { - log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); + log.debug("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); var ctx = msg.getCtx(); if (msg.isForceReinit()) { - log.info("Force reinitialization of CF: [{}].", ctx.getCfId()); + log.debug("Force reinitialization of CF: [{}].", ctx.getCfId()); states.remove(ctx.getCfId()); } try { @@ -138,7 +138,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldEntityDeleteMsg msg) { - log.info("[{}] Processing CF entity delete msg.", msg.getEntityId()); + log.debug("[{}] Processing CF entity delete msg.", msg.getEntityId()); if (this.entityId.equals(msg.getEntityId())) { if (states.isEmpty()) { msg.getCallback().onSuccess(); @@ -244,7 +244,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { if (newArgValues.isEmpty()) { - log.info("[{}] No new argument values to process for CF.", ctx.getCfId()); + log.debug("[{}] No new argument values to process for CF.", ctx.getCfId()); callback.onSuccess(CALLBACKS_PER_CF); } CalculatedFieldState state = states.get(ctx.getCfId()); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index e109e83e4d..15a8fbebb9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -236,12 +236,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException { var cfId = new CalculatedFieldId(msg.getEntityId().getId()); if (calculatedFields.containsKey(cfId)) { - log.warn("[{}] CF was already initialized [{}]", tenantId, cfId); + log.debug("[{}] CF was already initialized [{}]", tenantId, cfId); callback.onSuccess(); } else { var cf = cfDaoService.findById(msg.getTenantId(), cfId); if (cf == null) { - log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); + log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); @@ -268,7 +268,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } else { var newCf = cfDaoService.findById(msg.getTenantId(), cfId); if (newCf == null) { - log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); + log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); @@ -313,7 +313,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware var cfId = new CalculatedFieldId(msg.getEntityId().getId()); var cfCtx = calculatedFields.remove(cfId); if (cfCtx == null) { - log.warn("[{}] CF was already deleted [{}]", tenantId, cfId); + log.debug("[{}] CF was already deleted [{}]", tenantId, cfId); callback.onSuccess(); } else { entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index ffbd0d4b03..37f79edf6e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -68,7 +68,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME); + var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); PartitionedQueueConsumerManager> stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) @@ -106,7 +106,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); TbProtoQueueMsg msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 889d8fcc01..3d2f522440 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -46,7 +46,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer)); + super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer)); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 8b1cc8be1e..1af2116805 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -29,16 +29,16 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; @@ -81,7 +81,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa private long packProcessingTimeout; private final TbRuleEngineQueueFactory queueFactory; - private final QueueService queueService; + private final TenantService tenantService; private final CalculatedFieldStateService stateService; private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); @@ -96,33 +96,33 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, - QueueService queueService, - CalculatedFieldStateService stateService) { + CalculatedFieldStateService stateService, + TenantService tenantService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; - this.queueService = queueService; this.stateService = stateService; + this.tenantService = tenantService; } @Override protected void onStartUp() { - List queues = queueService.findAllQueues(); - for (Queue configuration : queues) { - if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { - stateService.init(createConsumer(configuration)); + PageDataIterable iterator = new PageDataIterable<>(tenantService::findTenantsIds, 1024); + for (TenantId tenantId : iterator) { + if (partitionService.isManagedByCurrentService(tenantId)) { + stateService.init(createConsumer(tenantId)); } } } - private PartitionedQueueConsumerManager> createConsumer(Queue queue) { - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, queue.getTenantId()); + private PartitionedQueueConsumerManager> createConsumer(TenantId tenantId) { + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); var eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) - .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(queue, partitionId)) + .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(tenantId, partitionId)) .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) @@ -251,7 +251,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa if (!CollectionUtils.isEmpty(partitions)) { stateService.delete(queueKey, partitions); } - var consumer = consumers.remove(queueKey); + var consumer = consumers.get(queueKey); if (consumer != null) { consumer.stop(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index ec7da1975d..94d53e4864 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -133,7 +134,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 14ee1c212a..4cb8fc5793 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -20,6 +20,7 @@ import jakarta.annotation.PreDestroy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; @@ -513,11 +514,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { - String queueName = configuration.getName(); - String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId); + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { + String queueName = DataConstants.CF_QUEUE_NAME; + String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); - cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 74c78e1aff..ae565a0827 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -20,6 +20,8 @@ import jakarta.annotation.PreDestroy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -313,11 +315,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { - String queueName = configuration.getName(); - String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId); + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { + String queueName = DataConstants.CF_QUEUE_NAME; + String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); - cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 4a4b36b6ae..b71f6a0471 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.provider; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; @@ -121,7 +122,7 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate(); - TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId); + TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId); TbQueueAdmin getCalculatedFieldQueueAdmin(); From ae894f9ed3b4a1251e95131889e491d5c6a5c82b Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 1 Apr 2025 10:49:16 +0300 Subject: [PATCH 3/3] create consumer when create tenant --- .../queue/DefaultTbCalculatedFieldConsumerService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 1af2116805..b01554f936 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -258,6 +258,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa } return toRemove; }); + } else if (event.getEvent() == ComponentLifecycleEvent.CREATED) { + if (partitionService.isManagedByCurrentService(event.getTenantId())) { + stateService.init(createConsumer(event.getTenantId())); + } } } }