consumer groups updates

This commit is contained in:
IrynaMatveieva 2025-03-31 16:56:34 +03:00
parent 8fca058be9
commit f84e5da9df
9 changed files with 41 additions and 36 deletions

View File

@ -107,7 +107,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
public void process(CalculatedFieldStateRestoreMsg msg) {
CalculatedFieldId cfId = msg.getId().cfId();
log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId);
log.debug("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId);
if (msg.getState() != null) {
states.put(cfId, msg.getState());
} else {
@ -116,10 +116,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId());
log.debug("[{}] Processing entity init CF msg.", msg.getCtx().getCfId());
var ctx = msg.getCtx();
if (msg.isForceReinit()) {
log.info("Force reinitialization of CF: [{}].", ctx.getCfId());
log.debug("Force reinitialization of CF: [{}].", ctx.getCfId());
states.remove(ctx.getCfId());
}
try {
@ -138,7 +138,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
public void process(CalculatedFieldEntityDeleteMsg msg) {
log.info("[{}] Processing CF entity delete msg.", msg.getEntityId());
log.debug("[{}] Processing CF entity delete msg.", msg.getEntityId());
if (this.entityId.equals(msg.getEntityId())) {
if (states.isEmpty()) {
msg.getCallback().onSuccess();
@ -244,7 +244,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException {
if (newArgValues.isEmpty()) {
log.info("[{}] No new argument values to process for CF.", ctx.getCfId());
log.debug("[{}] No new argument values to process for CF.", ctx.getCfId());
callback.onSuccess(CALLBACKS_PER_CF);
}
CalculatedFieldState state = states.get(ctx.getCfId());

View File

@ -236,12 +236,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
if (calculatedFields.containsKey(cfId)) {
log.warn("[{}] CF was already initialized [{}]", tenantId, cfId);
log.debug("[{}] CF was already initialized [{}]", tenantId, cfId);
callback.onSuccess();
} else {
var cf = cfDaoService.findById(msg.getTenantId(), cfId);
if (cf == null) {
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
callback.onSuccess();
} else {
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
@ -268,7 +268,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
} else {
var newCf = cfDaoService.findById(msg.getTenantId(), cfId);
if (newCf == null) {
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
log.debug("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
callback.onSuccess();
} else {
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
@ -313,7 +313,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
var cfCtx = calculatedFields.remove(cfId);
if (cfCtx == null) {
log.warn("[{}] CF was already deleted [{}]", tenantId, cfId);
log.debug("[{}] CF was already deleted [{}]", tenantId, cfId);
callback.onSuccess();
} else {
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx);

View File

@ -68,7 +68,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME);
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
PartitionedQueueConsumerManager<TbProtoQueueMsg<CalculatedFieldStateProto>> stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
.queueKey(queueKey)
.topic(partitionService.getTopic(queueKey))
@ -106,7 +106,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
@Override
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId());
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, stateId.tenantId(), stateId.entityId());
TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto);
if (stateMsgProto == null) {
putStateId(msg.getHeaders(), stateId);

View File

@ -46,7 +46,7 @@ public class RocksDBCalculatedFieldStateService extends AbstractCalculatedFieldS
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer));
super.stateServices.put(new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME), new DefaultQueueStateService<>(eventConsumer));
}
@Override

View File

@ -29,16 +29,16 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
@ -81,7 +81,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
private long packProcessingTimeout;
private final TbRuleEngineQueueFactory queueFactory;
private final QueueService queueService;
private final TenantService tenantService;
private final CalculatedFieldStateService stateService;
private final ConcurrentMap<QueueKey, PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>>> consumers = new ConcurrentHashMap<>();
@ -96,33 +96,33 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService,
CalculatedFieldCache calculatedFieldCache,
QueueService queueService,
CalculatedFieldStateService stateService) {
CalculatedFieldStateService stateService,
TenantService tenantService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.queueFactory = tbQueueFactory;
this.queueService = queueService;
this.stateService = stateService;
this.tenantService = tenantService;
}
@Override
protected void onStartUp() {
List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
stateService.init(createConsumer(configuration));
PageDataIterable<TenantId> iterator = new PageDataIterable<>(tenantService::findTenantsIds, 1024);
for (TenantId tenantId : iterator) {
if (partitionService.isManagedByCurrentService(tenantId)) {
stateService.init(createConsumer(tenantId));
}
}
}
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> createConsumer(Queue queue) {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, queue.getTenantId());
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> createConsumer(TenantId tenantId) {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
var eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
.queueKey(queueKey)
.topic(partitionService.getTopic(queueKey))
.pollInterval(pollInterval)
.msgPackProcessor(this::processMsgs)
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(queue, partitionId))
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer(tenantId, partitionId))
.queueAdmin(queueFactory.getCalculatedFieldQueueAdmin())
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
@ -251,7 +251,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
if (!CollectionUtils.isEmpty(partitions)) {
stateService.delete(queueKey, partitions);
}
var consumer = consumers.remove(queueKey);
var consumer = consumers.get(queueKey);
if (consumer != null) {
consumer.stop();
}

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
@ -133,7 +134,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) {
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) {
return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
}

View File

@ -20,6 +20,7 @@ import jakarta.annotation.PreDestroy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
@ -513,11 +514,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) {
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId);
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) {
String queueName = DataConstants.CF_QUEUE_NAME;
String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId);
cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId
cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();

View File

@ -20,6 +20,8 @@ import jakarta.annotation.PreDestroy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
@ -313,11 +315,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId) {
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, partitionId);
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId) {
String queueName = DataConstants.CF_QUEUE_NAME;
String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId);
cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", configuration.getTenantId(), queueName, null), // the fat groupId
cfAdmin.syncOffsets(topicService.buildConsumerGroupId("cf-", tenantId, queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.queue.provider;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
@ -121,7 +122,7 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate();
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(Queue configuration, Integer partitionId);
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TenantId tenantId, Integer partitionId);
TbQueueAdmin getCalculatedFieldQueueAdmin();