From f84e5da9df500084fb93c5b88bc2db6116eaf7f8 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 31 Mar 2025 16:56:34 +0300 Subject: [PATCH] consumer groups updates --- ...CalculatedFieldEntityMessageProcessor.java | 10 +++---- ...alculatedFieldManagerMessageProcessor.java | 8 +++--- .../KafkaCalculatedFieldStateService.java | 4 +-- .../RocksDBCalculatedFieldStateService.java | 2 +- ...faultTbCalculatedFieldConsumerService.java | 28 +++++++++---------- .../InMemoryMonolithQueueFactory.java | 3 +- .../provider/KafkaMonolithQueueFactory.java | 9 +++--- .../KafkaTbRuleEngineQueueFactory.java | 10 ++++--- .../provider/TbRuleEngineQueueFactory.java | 3 +- 9 files changed, 41 insertions(+), 36 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index db65248a8f..5ea79e42a8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -107,7 +107,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM public void process(CalculatedFieldStateRestoreMsg msg) { CalculatedFieldId cfId = msg.getId().cfId(); - log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); + log.debug("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId); if (msg.getState() != null) { states.put(cfId, msg.getState()); } else { @@ -116,10 +116,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException { - log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); + log.debug("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); var ctx = msg.getCtx(); if (msg.isForceReinit()) { - log.info("Force reinitialization of CF: [{}].", ctx.getCfId()); + log.debug("Force reinitialization of CF: [{}].", ctx.getCfId()); states.remove(ctx.getCfId()); } try { @@ -138,7 +138,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldEntityDeleteMsg msg) { - log.info("[{}] Processing CF entity delete msg.", msg.getEntityId()); + log.debug("[{}] Processing CF entity delete msg.", msg.getEntityId()); if (this.entityId.equals(msg.getEntityId())) { if (states.isEmpty()) { msg.getCallback().onSuccess(); @@ -244,7 +244,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { if (newArgValues.isEmpty()) { - log.info("[{}] No new argument values to process for CF.", ctx.getCfId()); + log.debug("[{}] No new argument values to process for CF.", ctx.getCfId()); callback.onSuccess(CALLBACKS_PER_CF); } CalculatedFieldState state = states.get(ctx.getCfId()); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index e109e83e4d..15a8fbebb9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -236,12 +236,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException { var cfId = new CalculatedFieldId(msg.getEntityId().getId()); if (calculatedFields.containsKey(cfId)) { - log.warn("[{}] CF was already initialized [{}]", tenantId, cfId); + log.debug("[{}] CF was already initialized [{}]", tenantId, cfId); callback.onSuccess(); } else { var cf = cfDaoService.findById(msg.getTenantId(), cfId); if (cf == null) { - log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); + log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); @@ -268,7 +268,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } else { var newCf = cfDaoService.findById(msg.getTenantId(), cfId); if (newCf == null) { - log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); + log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); @@ -313,7 +313,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware var cfId = new CalculatedFieldId(msg.getEntityId().getId()); var cfCtx = calculatedFields.remove(cfId); if (cfCtx == null) { - log.warn("[{}] CF was already deleted [{}]", tenantId, cfId); + log.debug("[{}] CF was already deleted [{}]", tenantId, cfId); callback.onSuccess(); } else { entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index ffbd0d4b03..37f79edf6e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -68,7 +68,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME); + var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); PartitionedQueueConsumerManager> stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) @@ -106,7 +106,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); TbProtoQueueMsg msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java index 889d8fcc01..3d2f522440 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java @@ -46,7 +46,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { - super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer)); + super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer)); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 8b1cc8be1e..1af2116805 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -29,16 +29,16 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; @@ -81,7 +81,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa private long packProcessingTimeout; private final TbRuleEngineQueueFactory queueFactory; - private final QueueService queueService; + private final TenantService tenantService; private final CalculatedFieldStateService stateService; private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); @@ -96,33 +96,33 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, - QueueService queueService, - CalculatedFieldStateService stateService) { + CalculatedFieldStateService stateService, + TenantService tenantService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; - this.queueService = queueService; this.stateService = stateService; + this.tenantService = tenantService; } @Override protected void onStartUp() { - List queues = queueService.findAllQueues(); - for (Queue configuration : queues) { - if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { - stateService.init(createConsumer(configuration)); + PageDataIterable iterator = new PageDataIterable<>(tenantService::findTenantsIds, 1024); + for (TenantId tenantId : iterator) { + if (partitionService.isManagedByCurrentService(tenantId)) { + stateService.init(createConsumer(tenantId)); } } } - private PartitionedQueueConsumerManager> createConsumer(Queue queue) { - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, queue.getTenantId()); + private PartitionedQueueConsumerManager> createConsumer(TenantId tenantId) { + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); var eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) - .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(queue, partitionId)) + .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(tenantId, partitionId)) .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) @@ -251,7 +251,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa if (!CollectionUtils.isEmpty(partitions)) { stateService.delete(queueKey, partitions); } - var consumer = consumers.remove(queueKey); + var consumer = consumers.get(queueKey); if (consumer != null) { consumer.stop(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index ec7da1975d..94d53e4864 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -133,7 +134,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 14ee1c212a..4cb8fc5793 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -20,6 +20,7 @@ import jakarta.annotation.PreDestroy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; @@ -513,11 +514,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { - String queueName = configuration.getName(); - String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId); + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { + String queueName = DataConstants.CF_QUEUE_NAME; + String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); - cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 74c78e1aff..ae565a0827 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -20,6 +20,8 @@ import jakarta.annotation.PreDestroy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -313,11 +315,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) { - String queueName = configuration.getName(); - String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId); + public TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) { + String queueName = DataConstants.CF_QUEUE_NAME; + String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId); - cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId + cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 4a4b36b6ae..b71f6a0471 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.provider; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; @@ -121,7 +122,7 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate(); - TbQueueConsumer> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId); + TbQueueConsumer> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId); TbQueueAdmin getCalculatedFieldQueueAdmin();