diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 76cd2cfcf3..90d4056afc 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -30,13 +30,12 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; -import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.common.state.KafkaQueueStateService; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.common.state.KafkaQueueStateService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; @@ -59,7 +58,6 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta private final TbRuleEngineQueueFactory queueFactory; private final PartitionService partitionService; - private final TbQueueAdmin queueAdmin; @Value("${queue.calculated_fields.poll_interval:25}") private long pollInterval; @@ -94,7 +92,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta } }) .consumerCreator((config, partitionId) -> queueFactory.createCalculatedFieldStateConsumer()) - .queueAdmin(queueAdmin) + .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin()) .consumerExecutor(eventConsumer.getConsumerExecutor()) .scheduler(eventConsumer.getScheduler()) .taskExecutor(eventConsumer.getTaskExecutor()) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index fe42684b2b..ac62576714 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -42,7 +42,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinke import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; -import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; @@ -81,7 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer private long packProcessingTimeout; private final TbRuleEngineQueueFactory queueFactory; - private final TbQueueAdmin queueAdmin; private final CalculatedFieldStateService stateService; public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbQueueFactory, @@ -94,12 +92,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer ApplicationEventPublisher eventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, - TbQueueAdmin queueAdmin, CalculatedFieldStateService stateService) { super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); this.queueFactory = tbQueueFactory; - this.queueAdmin = queueAdmin; this.stateService = stateService; } @@ -114,7 +110,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) - .queueAdmin(queueAdmin) + .queueAdmin(queueFactory.getCalculatedFieldQueueAdmin()) .consumerExecutor(consumersExecutor) .scheduler(scheduler) .taskExecutor(mgmtExecutor) diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index e18c8171af..7ddc9147df 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -53,7 +53,6 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; -import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -92,7 +91,6 @@ public class EdqsProcessor implements TbQueueHandler, private final EdqsPartitionService partitionService; private final ConfigurableApplicationContext applicationContext; private final EdqsStateService stateService; - private final TbQueueAdmin queueAdmin; private PartitionedQueueConsumerManager> eventConsumer; private TbQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; @@ -143,7 +141,7 @@ public class EdqsProcessor implements TbQueueHandler, consumer.commit(); }) .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS)) - .queueAdmin(queueAdmin) + .queueAdmin(queueFactory.getEdqsQueueAdmin()) .consumerExecutor(consumersExecutor) .taskExecutor(taskExecutor) .scheduler(scheduler) diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 6d938d2976..85b8e92387 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -30,7 +30,6 @@ import org.thingsboard.server.edqs.processor.EdqsProducer; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; -import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; @@ -56,7 +55,6 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; private final EdqsQueueFactory queueFactory; - private final TbQueueAdmin queueAdmin; private final TopicService topicService; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -92,7 +90,7 @@ public class KafkaEdqsStateService implements EdqsStateService { consumer.commit(); }) .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE)) - .queueAdmin(queueAdmin) + .queueAdmin(queueFactory.getEdqsQueueAdmin()) .consumerExecutor(eventConsumer.getConsumerExecutor()) .taskExecutor(eventConsumer.getTaskExecutor()) .scheduler(eventConsumer.getScheduler()) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java index fed786e120..b5541c740b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.edqs; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; @@ -32,4 +33,6 @@ public interface EdqsQueueFactory { TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate(); + TbQueueAdmin getEdqsQueueAdmin(); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java index 0b6cc1909d..0801399c14 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; @@ -39,6 +40,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { private final InMemoryStorage storage; private final EdqsConfig edqsConfig; private final StatsFactory statsFactory; + private final TbQueueAdmin queueAdmin; @Override public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue) { @@ -76,4 +78,9 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { .build(); } + @Override + public TbQueueAdmin getEdqsQueueAdmin() { + return queueAdmin; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index 42ca604841..e985696040 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; @@ -126,4 +127,9 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { .build(); } + @Override + public TbQueueAdmin getEdqsQueueAdmin() { + return edqsEventsAdmin; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index d219428941..4bd3bf0fe6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -160,7 +160,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue int partition = record.partition(); Long endOffset = endOffsets.get(partition); if (endOffset == null) { - log.warn("End offset not found for {} [{}]", record.topic(), partition); + log.debug("End offset not found for {} [{}]", record.topic(), partition); return; } log.trace("[{}-{}] Got record offset {}, expected end offset: {}", record.topic(), partition, record.offset(), endOffset - 1); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index e97af10ecc..fac2cf5d5e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -138,6 +138,11 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE return new InMemoryTbQueueConsumer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); } + @Override + public TbQueueAdmin getCalculatedFieldQueueAdmin() { + return queueAdmin; + } + @Override public TbQueueProducer> createToCalculatedFieldMsgProducer() { return new InMemoryTbQueueProducer<>(storage, topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index f3d1e2d158..f07bd9dcbb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -526,6 +526,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi return consumerBuilder.build(); } + @Override + public TbQueueAdmin getCalculatedFieldQueueAdmin() { + return cfAdmin; + } + @Override public TbQueueProducer> createToCalculatedFieldMsgProducer() { TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 43fbb5efeb..d0e6c2f123 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -321,6 +321,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { return consumerBuilder.build(); } + @Override + public TbQueueAdmin getCalculatedFieldQueueAdmin() { + return cfAdmin; + } + @Override public TbQueueProducer> createToCalculatedFieldMsgProducer() { TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 767fea9f0c..03f662d8bb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -29,6 +29,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateSer import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; @@ -122,6 +123,8 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueConsumer> createToCalculatedFieldMsgConsumer(); + TbQueueAdmin getCalculatedFieldQueueAdmin(); + TbQueueProducer> createToCalculatedFieldMsgProducer(); TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer();