From f88be5b86355ef6d51789a27f1b0e275a1e4e913 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 19 Mar 2025 15:07:48 +0200 Subject: [PATCH] added impl for cf notification producer --- .../DefaultTbCalculatedFieldConsumerService.java | 2 +- .../queue/provider/InMemoryMonolithQueueFactory.java | 2 +- .../queue/provider/KafkaMonolithQueueFactory.java | 2 +- .../provider/KafkaTbRuleEngineQueueFactory.java | 12 +++++++++++- .../queue/provider/TbRuleEngineProducerProvider.java | 1 + .../queue/provider/TbRuleEngineQueueFactory.java | 4 +++- 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index fe42684b2b..c91441347b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -214,7 +214,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer @Override protected TbQueueConsumer> createNotificationsConsumer() { - return queueFactory.createToCalculatedFieldNotificationsMsgConsumer(); + return queueFactory.createToCalculatedFieldNotificationMsgConsumer(); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index e97af10ecc..b29a501c0e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -144,7 +144,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldNotificationMsgConsumer() { return new InMemoryTbQueueConsumer<>(storage, topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName()); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index f3d1e2d158..5d3d58cd46 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -537,7 +537,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldNotificationMsgConsumer() { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 43fbb5efeb..3bdbfd2851 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -332,7 +332,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer() { + public TbQueueConsumer> createToCalculatedFieldNotificationMsgConsumer() { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName()); @@ -344,6 +344,16 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { return consumerBuilder.build(); } + @Override + public TbQueueProducer> createToCalculatedFieldNotificationMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("tb-calculated-field-notifications-producer-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName()); + requestBuilder.admin(notificationAdmin); + return requestBuilder.build(); + } + @Override public TbQueueConsumer> createCalculatedFieldStateConsumer() { return TbKafkaConsumerTemplate.>builder() diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java index d3b507910a..8e1952fc14 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java @@ -69,6 +69,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { this.toEdgeNotifications = tbQueueProvider.createEdgeNotificationsMsgProducer(); this.toEdgeEvents = tbQueueProvider.createEdgeEventMsgProducer(); this.toCalculatedFields = tbQueueProvider.createToCalculatedFieldMsgProducer(); + this.toCalculatedFieldNotifications = tbQueueProvider.createToCalculatedFieldNotificationMsgProducer(); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 767fea9f0c..ad8ace6b6c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -124,7 +124,9 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory TbQueueProducer> createToCalculatedFieldMsgProducer(); - TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer(); + TbQueueConsumer> createToCalculatedFieldNotificationMsgConsumer(); + + TbQueueProducer> createToCalculatedFieldNotificationMsgProducer(); TbQueueConsumer> createCalculatedFieldStateConsumer();