Merge pull request #12963 from thingsboard/fix/edqs

Fix invalid queue admin reference
This commit is contained in:
Viacheslav Klimov 2025-03-18 12:19:14 +02:00 committed by GitHub
commit f34cc14621
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 40 additions and 16 deletions

View File

@ -30,13 +30,12 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
@ -59,7 +58,6 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
private final TbRuleEngineQueueFactory queueFactory;
private final PartitionService partitionService;
private final TbQueueAdmin queueAdmin;
@Value("${queue.calculated_fields.poll_interval:25}")
private long pollInterval;
@ -94,7 +92,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
}
})
.consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer())
.queueAdmin(queueAdmin)
.queueAdmin(queueFactory.getCalculatedFieldQueueAdmin())
.consumerExecutor(eventConsumer.getConsumerExecutor())
.scheduler(eventConsumer.getScheduler())
.taskExecutor(eventConsumer.getTaskExecutor())

View File

@ -42,7 +42,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinke
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
@ -81,7 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
private long packProcessingTimeout;
private final TbRuleEngineQueueFactory queueFactory;
private final TbQueueAdmin queueAdmin;
private final CalculatedFieldStateService stateService;
public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory,
@ -94,12 +92,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService,
CalculatedFieldCache calculatedFieldCache,
TbQueueAdmin queueAdmin,
CalculatedFieldStateService stateService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.queueFactory = tbQueueFactory;
this.queueAdmin = queueAdmin;
this.stateService = stateService;
}
@ -114,7 +110,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
.pollInterval(pollInterval)
.msgPackProcessor(this::processMsgs)
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
.queueAdmin(queueAdmin)
.queueAdmin(queueFactory.getCalculatedFieldQueueAdmin())
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)

View File

@ -53,7 +53,6 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -92,7 +91,6 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
private final EdqsPartitionService partitionService;
private final ConfigurableApplicationContext applicationContext;
private final EdqsStateService stateService;
private final TbQueueAdmin queueAdmin;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
private TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
@ -143,7 +141,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS))
.queueAdmin(queueAdmin)
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(consumersExecutor)
.taskExecutor(taskExecutor)
.scheduler(scheduler)

View File

@ -30,7 +30,6 @@ import org.thingsboard.server.edqs.processor.EdqsProducer;
import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
@ -56,7 +55,6 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsConfig config;
private final EdqsPartitionService partitionService;
private final EdqsQueueFactory queueFactory;
private final TbQueueAdmin queueAdmin;
private final TopicService topicService;
@Autowired @Lazy
private EdqsProcessor edqsProcessor;
@ -92,7 +90,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE))
.queueAdmin(queueAdmin)
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(eventConsumer.getConsumerExecutor())
.taskExecutor(eventConsumer.getTaskExecutor())
.scheduler(eventConsumer.getScheduler())

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.edqs;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
@ -32,4 +33,6 @@ public interface EdqsQueueFactory {
TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate();
TbQueueAdmin getEdqsQueueAdmin();
}

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
@ -39,6 +40,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
private final InMemoryStorage storage;
private final EdqsConfig edqsConfig;
private final StatsFactory statsFactory;
private final TbQueueAdmin queueAdmin;
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) {
@ -76,4 +78,9 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
.build();
}
@Override
public TbQueueAdmin getEdqsQueueAdmin() {
return queueAdmin;
}
}

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
@ -126,4 +127,9 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
.build();
}
@Override
public TbQueueAdmin getEdqsQueueAdmin() {
return edqsEventsAdmin;
}
}

View File

@ -160,7 +160,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
int partition = record.partition();
Long endOffset = endOffsets.get(partition);
if (endOffset == null) {
log.warn("End offset not found for {} [{}]", record.topic(), partition);
log.debug("End offset not found for {} [{}]", record.topic(), partition);
return;
}
log.trace("[{}-{}] Got record offset {}, expected end offset: {}", record.topic(), partition, record.offset(), endOffset - 1);

View File

@ -138,6 +138,11 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
}
@Override
public TbQueueAdmin getCalculatedFieldQueueAdmin() {
return queueAdmin;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));

View File

@ -526,6 +526,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
return consumerBuilder.build();
}
@Override
public TbQueueAdmin getCalculatedFieldQueueAdmin() {
return cfAdmin;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder();

View File

@ -321,6 +321,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
return consumerBuilder.build();
}
@Override
public TbQueueAdmin getCalculatedFieldQueueAdmin() {
return cfAdmin;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder();

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
@ -122,6 +123,8 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer();
TbQueueAdmin getCalculatedFieldQueueAdmin();
TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer();
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer();