Fix invalid queue admin reference

This commit is contained in:
ViacheslavKlimov 2025-03-18 12:11:29 +02:00
parent 643b2f5c92
commit daeb398d47
12 changed files with 40 additions and 16 deletions

View File

@ -30,13 +30,12 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
@ -59,7 +58,6 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
private final TbRuleEngineQueueFactory queueFactory; private final TbRuleEngineQueueFactory queueFactory;
private final PartitionService partitionService; private final PartitionService partitionService;
private final TbQueueAdmin queueAdmin;
@Value("${queue.calculated_fields.poll_interval:25}") @Value("${queue.calculated_fields.poll_interval:25}")
private long pollInterval; private long pollInterval;
@ -94,7 +92,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
} }
}) })
.consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer()) .consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer())
.queueAdmin(queueAdmin) .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin())
.consumerExecutor(eventConsumer.getConsumerExecutor()) .consumerExecutor(eventConsumer.getConsumerExecutor())
.scheduler(eventConsumer.getScheduler()) .scheduler(eventConsumer.getScheduler())
.taskExecutor(eventConsumer.getTaskExecutor()) .taskExecutor(eventConsumer.getTaskExecutor())

View File

@ -42,7 +42,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinke
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
@ -81,7 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
private long packProcessingTimeout; private long packProcessingTimeout;
private final TbRuleEngineQueueFactory queueFactory; private final TbRuleEngineQueueFactory queueFactory;
private final TbQueueAdmin queueAdmin;
private final CalculatedFieldStateService stateService; private final CalculatedFieldStateService stateService;
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
@ -94,12 +92,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
ApplicationEventPublisher eventPublisher, ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService, JwtSettingsService jwtSettingsService,
CalculatedFieldCache calculatedFieldCache, CalculatedFieldCache calculatedFieldCache,
TbQueueAdmin queueAdmin,
CalculatedFieldStateService stateService) { CalculatedFieldStateService stateService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService); eventPublisher, jwtSettingsService);
this.queueFactory = tbQueueFactory; this.queueFactory = tbQueueFactory;
this.queueAdmin = queueAdmin;
this.stateService = stateService; this.stateService = stateService;
} }
@ -114,7 +110,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
.pollInterval(pollInterval) .pollInterval(pollInterval)
.msgPackProcessor(this::processMsgs) .msgPackProcessor(this::processMsgs)
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
.queueAdmin(queueAdmin) .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin())
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.scheduler(scheduler) .scheduler(scheduler)
.taskExecutor(mgmtExecutor) .taskExecutor(mgmtExecutor)

View File

@ -53,7 +53,6 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -92,7 +91,6 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
private final EdqsPartitionService partitionService; private final EdqsPartitionService partitionService;
private final ConfigurableApplicationContext applicationContext; private final ConfigurableApplicationContext applicationContext;
private final EdqsStateService stateService; private final EdqsStateService stateService;
private final TbQueueAdmin queueAdmin;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer; private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
private TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate; private TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
@ -143,7 +141,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
consumer.commit(); consumer.commit();
}) })
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS)) .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS))
.queueAdmin(queueAdmin) .queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.taskExecutor(taskExecutor) .taskExecutor(taskExecutor)
.scheduler(scheduler) .scheduler(scheduler)

View File

@ -30,7 +30,6 @@ import org.thingsboard.server.edqs.processor.EdqsProducer;
import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
@ -56,7 +55,6 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsConfig config; private final EdqsConfig config;
private final EdqsPartitionService partitionService; private final EdqsPartitionService partitionService;
private final EdqsQueueFactory queueFactory; private final EdqsQueueFactory queueFactory;
private final TbQueueAdmin queueAdmin;
private final TopicService topicService; private final TopicService topicService;
@Autowired @Lazy @Autowired @Lazy
private EdqsProcessor edqsProcessor; private EdqsProcessor edqsProcessor;
@ -92,7 +90,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
consumer.commit(); consumer.commit();
}) })
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE)) .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE))
.queueAdmin(queueAdmin) .queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(eventConsumer.getConsumerExecutor()) .consumerExecutor(eventConsumer.getConsumerExecutor())
.taskExecutor(eventConsumer.getTaskExecutor()) .taskExecutor(eventConsumer.getTaskExecutor())
.scheduler(eventConsumer.getScheduler()) .scheduler(eventConsumer.getScheduler())

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.edqs;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.TbQueueResponseTemplate;
@ -32,4 +33,6 @@ public interface EdqsQueueFactory {
TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate(); TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate();
TbQueueAdmin getEdqsQueueAdmin();
} }

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.TbQueueResponseTemplate;
@ -39,6 +40,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
private final InMemoryStorage storage; private final InMemoryStorage storage;
private final EdqsConfig edqsConfig; private final EdqsConfig edqsConfig;
private final StatsFactory statsFactory; private final StatsFactory statsFactory;
private final TbQueueAdmin queueAdmin;
@Override @Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) { public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) {
@ -76,4 +78,9 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
.build(); .build();
} }
@Override
public TbQueueAdmin getEdqsQueueAdmin() {
return queueAdmin;
}
} }

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.TbQueueResponseTemplate;
@ -126,4 +127,9 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
.build(); .build();
} }
@Override
public TbQueueAdmin getEdqsQueueAdmin() {
return edqsEventsAdmin;
}
} }

View File

@ -160,7 +160,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
int partition = record.partition(); int partition = record.partition();
Long endOffset = endOffsets.get(partition); Long endOffset = endOffsets.get(partition);
if (endOffset == null) { if (endOffset == null) {
log.warn("End offset not found for {} [{}]", record.topic(), partition); log.debug("End offset not found for {} [{}]", record.topic(), partition);
return; return;
} }
log.trace("[{}-{}] Got record offset {}, expected end offset: {}", record.topic(), partition, record.offset(), endOffset - 1); log.trace("[{}-{}] Got record offset {}, expected end offset: {}", record.topic(), partition, record.offset(), endOffset - 1);

View File

@ -138,6 +138,11 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
} }
@Override
public TbQueueAdmin getCalculatedFieldQueueAdmin() {
return queueAdmin;
}
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));

View File

@ -526,6 +526,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
return consumerBuilder.build(); return consumerBuilder.build();
} }
@Override
public TbQueueAdmin getCalculatedFieldQueueAdmin() {
return cfAdmin;
}
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder();

View File

@ -321,6 +321,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
return consumerBuilder.build(); return consumerBuilder.build();
} }
@Override
public TbQueueAdmin getCalculatedFieldQueueAdmin() {
return cfAdmin;
}
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder();

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
@ -122,6 +123,8 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(); TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer();
TbQueueAdmin getCalculatedFieldQueueAdmin();
TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer(); TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer();
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer(); TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer();