diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index db65248a8f..5ea79e42a8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -107,7 +107,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM public void process(CalculatedFieldStateRestoreMsg msg) { CalculatedFieldId cfId = msg.getId().cfId(); - log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); + log.debug("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); if (msg.getState() != null) { states.put(cfId, msg.getState()); } else { @@ -116,10 +116,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException { - log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); + log.debug("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); var ctx = msg.getCtx(); if (msg.isForceReinit()) { - log.info("Force reinitialization of CF: [{}].", ctx.getCfId()); + log.debug("Force reinitialization of CF: [{}].", ctx.getCfId()); states.remove(ctx.getCfId()); } try { @@ -138,7 +138,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldEntityDeleteMsg msg) { - log.info("[{}] Processing CF entity delete msg.", msg.getEntityId()); + log.debug("[{}] Processing CF entity delete msg.", msg.getEntityId()); if (this.entityId.equals(msg.getEntityId())) { if (states.isEmpty()) { msg.getCallback().onSuccess(); @@ -244,7 +244,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { if (newArgValues.isEmpty()) { - log.info("[{}] No new argument values to process for CF.", ctx.getCfId()); + log.debug("[{}] No new argument values to process for CF.", ctx.getCfId()); callback.onSuccess(CALLBACKS_PER_CF); } CalculatedFieldState state = states.get(ctx.getCfId()); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 521503a82e..060893ec8d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -236,12 +236,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException { var cfId = new CalculatedFieldId(msg.getEntityId().getId()); if (calculatedFields.containsKey(cfId)) { - log.warn("[{}] CF was already initialized [{}]", tenantId, cfId); + log.debug("[{}] CF was already initialized [{}]", tenantId, cfId); callback.onSuccess(); } else { var cf = cfDaoService.findById(msg.getTenantId(), cfId); if (cf == null) { - log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); + log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); @@ -268,7 +268,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } else { var newCf = cfDaoService.findById(msg.getTenantId(), cfId); if (newCf == null) { - log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); + log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); @@ -313,7 +313,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware var cfId = new CalculatedFieldId(msg.getEntityId().getId()); var cfCtx = calculatedFields.remove(cfId); if (cfCtx == null) { - log.warn("[{}] CF was already deleted [{}]", tenantId, cfId); + log.debug("[{}] CF was already deleted [{}]", tenantId, cfId); callback.onSuccess(); } else { entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index 91c08ab6e0..1347705a23 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -30,7 +30,9 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.Collection; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; @@ -41,7 +43,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF @Autowired private ActorSystemContext actorSystemContext; - protected QueueStateService, TbProtoQueueMsg> stateService; + protected Map, TbProtoQueueMsg>> stateServices = new ConcurrentHashMap<>(); @Override public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { @@ -72,22 +74,22 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF @Override public void restore(QueueKey queueKey, Set partitions) { - stateService.update(queueKey, partitions); + stateServices.get(queueKey).update(queueKey, partitions); } @Override - public void delete(Set partitions) { - stateService.delete(partitions); + public void delete(QueueKey queueKey, Set partitions) { + stateServices.get(queueKey).delete(partitions); } @Override - public Set getPartitions() { - return stateService.getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + public Set getPartitions(QueueKey queueKey) { + return stateServices.get(queueKey).getPartitions().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); } @Override - public void stop() { - stateService.stop(); + public void stop(QueueKey queueKey) { + stateServices.get(queueKey).stop(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java index d0b34f18e8..a8b0ccf30e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldStateService.java @@ -37,10 +37,10 @@ public interface CalculatedFieldStateService { void restore(QueueKey queueKey, Set partitions); - void delete(Set partitions); + void delete(QueueKey queueKey, Set partitions); - Set getPartitions(); + Set getPartitions(QueueKey queueKey); - void stop(); + void stop(QueueKey queueKey); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 3620ad3639..37f79edf6e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -68,7 +68,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME); + var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); PartitionedQueueConsumerManager> stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) @@ -97,16 +97,16 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta .scheduler(eventConsumer.getScheduler()) .taskExecutor(eventConsumer.getTaskExecutor()) .build(); - super.stateService = KafkaQueueStateService., TbProtoQueueMsg>builder() + super.stateServices.put(queueKey, KafkaQueueStateService., TbProtoQueueMsg>builder() .eventConsumer(eventConsumer) .stateConsumer(stateConsumer) - .build(); + .build()); this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); } @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); TbProtoQueueMsg msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); @@ -148,8 +148,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta } @Override - public void stop() { - super.stop(); + public void stop(QueueKey queueKey) { + super.stop(queueKey); stateProducer.stop(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 0eaa506dfd..3d2f522440 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -20,13 +20,15 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.queue.common.state.DefaultQueueStateService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.common.state.DefaultQueueStateService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService; import org.thingsboard.server.service.cf.CfRocksDb; @@ -44,7 +46,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - super.stateService = new DefaultQueueStateService<>(eventConsumer); + super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer)); } @Override @@ -61,7 +63,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS @Override public void restore(QueueKey queueKey, Set partitions) { - if (stateService.getPartitions().isEmpty()) { + if (stateServices.get(queueKey).getPartitions().isEmpty()) { cfRocksDb.forEach((key, value) -> { try { processRestoredState(CalculatedFieldStateProto.parseFrom(value)); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 7dfcb1e766..d4a88c2fe6 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; @@ -37,6 +38,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; @@ -80,9 +82,13 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa private long packProcessingTimeout; private final TbRuleEngineQueueFactory queueFactory; + private final TenantService tenantService; private final CalculatedFieldStateService stateService; private final CalculatedFieldEntityProfileCache entityProfileCache; + private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); + + public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, ActorSystemContext actorContext, TbDeviceProfileCache deviceProfileCache, @@ -94,29 +100,41 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, CalculatedFieldStateService stateService, - CalculatedFieldEntityProfileCache entityProfileCache) { + CalculatedFieldEntityProfileCache entityProfileCache, + TenantService tenantService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; this.stateService = stateService; this.entityProfileCache = entityProfileCache; + this.tenantService = tenantService; } @Override protected void onStartUp() { - var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); - PartitionedQueueConsumerManager> eventConsumer = PartitionedQueueConsumerManager.>create() + PageDataIterable iterator = new PageDataIterable<>(tenantService::findTenantsIds, 1024); + for (TenantId tenantId : iterator) { + if (partitionService.isManagedByCurrentService(tenantId)) { + stateService.init(createConsumer(tenantId)); + } + } + } + + private PartitionedQueueConsumerManager> createConsumer(TenantId tenantId) { + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); + var eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) - .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) + .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(tenantId, partitionId)) .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) .taskExecutor(mgmtExecutor) .build(); - stateService.init(eventConsumer); + consumers.put(queueKey, eventConsumer); + return eventConsumer; } @PreDestroy @@ -232,13 +250,24 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa if (event.getEntityId().getEntityType() == EntityType.TENANT) { if (event.getEvent() == ComponentLifecycleEvent.DELETED) { entityProfileCache.removeTenant(event.getTenantId()); - Set partitions = stateService.getPartitions(); - if (CollectionUtils.isEmpty(partitions)) { - return; + consumers.keySet().removeIf(queueKey -> { + boolean toRemove = queueKey.getTenantId().equals(event.getTenantId()); + if (toRemove) { + Set partitions = stateService.getPartitions(queueKey); + if (!CollectionUtils.isEmpty(partitions)) { + stateService.delete(queueKey, partitions); + } + var consumer = consumers.get(queueKey); + if (consumer != null) { + consumer.stop(); + } + } + return toRemove; + }); + } else if (event.getEvent() == ComponentLifecycleEvent.CREATED) { + if (partitionService.isManagedByCurrentService(event.getTenantId())) { + stateService.init(createConsumer(event.getTenantId())); } - stateService.delete(partitions.stream() - .filter(tpi -> tpi.getTenantId().isPresent() && tpi.getTenantId().get().equals(event.getTenantId())) - .collect(Collectors.toSet())); } } else if (event.getEntityId().getEntityType() == EntityType.ASSET_PROFILE) { if (event.getEvent() == ComponentLifecycleEvent.DELETED) { @@ -271,7 +300,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa @Override protected void stopConsumers() { super.stopConsumers(); - stateService.stop(); // eventConsumer will be stopped by stateService + consumers.keySet().forEach(stateService::stop); // eventConsumer will be stopped by stateService } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index 89d83af826..94d53e4864 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -133,7 +134,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index dcebe085b3..4cb8fc5793 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -20,6 +20,7 @@ import jakarta.annotation.PreDestroy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; @@ -103,7 +104,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin housekeeperReprocessingAdmin; private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; - private final TbQueueAdmin cfAdmin; + private final TbKafkaAdmin cfAdmin; private final TbQueueAdmin cfStateAdmin; private final TbQueueAdmin edqsEventsAdmin; private final TbKafkaAdmin edqsRequestsAdmin; @@ -513,15 +514,22 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { + String queueName = DataConstants.CF_QUEUE_NAME; + String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); + + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); - consumerBuilder.clientId("monolith-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-consumer")); + consumerBuilder.clientId("cf-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(cfAdmin); consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index d43ef5c9ac..ae565a0827 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -20,6 +20,8 @@ import jakarta.annotation.PreDestroy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -90,7 +92,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueAdmin housekeeperAdmin; private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; - private final TbQueueAdmin cfAdmin; + private final TbKafkaAdmin cfAdmin; private final TbQueueAdmin cfStateAdmin; private final TbQueueAdmin edqsEventsAdmin; private final AtomicLong consumerCount = new AtomicLong(); @@ -313,12 +315,18 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { + String queueName = DataConstants.CF_QUEUE_NAME; + String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); + + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId + groupId, partitionId); + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); - consumerBuilder.clientId("tb-rule-engine-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-consumer")); + consumerBuilder.clientId("cf-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(groupId); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(cfAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 329e3e346a..b71f6a0471 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.provider; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; @@ -121,7 +122,7 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate(); - TbQueueConsumer> createToCalculatedFieldMsgConsumer(); + TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId); TbQueueAdmin getCalculatedFieldQueueAdmin();