From 2cc0d6f513bdd4f5873e62af1db51d5d6b10eab4 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 23 Jan 2025 15:49:31 +0200 Subject: [PATCH] added implementations for consumer/producer methods --- ...faultTbCalculatedFieldConsumerService.java | 8 +- .../TbCalculatedFieldConsumerService.java | 15 ++++ .../src/main/resources/thingsboard.yml | 2 + .../queue/kafka/TbKafkaTopicConfigs.java | 5 ++ .../provider/KafkaMonolithQueueFactory.java | 81 +++++++++++++++++++ .../provider/KafkaTbCoreQueueFactory.java | 15 ++++ .../KafkaTbRuleEngineQueueFactory.java | 72 ++++++++++++++++- .../queue/provider/TbCoreQueueFactory.java | 8 +- .../provider/TbRuleEngineQueueFactory.java | 8 +- .../dao/sql/event/EventInsertRepository.java | 2 +- 10 files changed, 202 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 85ee01ac5f..ff4634f957 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2024 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java index 387bdd7143..bba4b1e35b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.service.queue; import org.springframework.context.ApplicationListener; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 94c1bef71c..fdd9923292 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1619,6 +1619,8 @@ queue: edge: "${TB_QUEUE_KAFKA_EDGE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for Edge event topic edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for Calculated Field topics + calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index ee529e8a68..cdd0add38b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -52,6 +52,8 @@ public class TbKafkaTopicConfigs { private String housekeeperProperties; @Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}") private String housekeeperReprocessingProperties; + @Value("${queue.kafka.topic-properties.calculated-field:}") + private String calculatedFieldProperties; @Getter private Map coreConfigs; @@ -79,6 +81,8 @@ public class TbKafkaTopicConfigs { private Map edgeConfigs; @Getter private Map edgeEventConfigs; + @Getter + private Map calculatedFieldConfigs; @PostConstruct private void init() { @@ -97,6 +101,7 @@ public class TbKafkaTopicConfigs { housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties); edgeConfigs = PropertyUtils.getProps(edgeProperties); edgeEventConfigs = PropertyUtils.getProps(edgeEventProperties); + calculatedFieldConfigs = PropertyUtils.getProps(calculatedFieldProperties); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index dd5d61e834..01e174023c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -25,6 +25,9 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg; @@ -54,6 +57,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueEdgeSettings; import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; @@ -79,6 +83,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueVersionControlSettings vcSettings; private final TbQueueEdgeSettings edgeSettings; + private final TbQueueCalculatedFieldSettings calculatedFieldSettings; private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueAdmin coreAdmin; @@ -94,6 +99,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin housekeeperReprocessingAdmin; private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; + private final TbQueueAdmin cfAdmin; private final AtomicLong consumerCount = new AtomicLong(); @@ -106,6 +112,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbQueueVersionControlSettings vcSettings, TbQueueEdgeSettings edgeSettings, + TbQueueCalculatedFieldSettings calculatedFieldSettings, TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; @@ -119,6 +126,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.vcSettings = vcSettings; this.consumerStatsService = consumerStatsService; this.edgeSettings = edgeSettings; + this.calculatedFieldSettings = calculatedFieldSettings; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -133,6 +141,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs()); this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); + this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs()); } @Override @@ -490,6 +499,75 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi return requestBuilder.build(); } + @Override + public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); + consumerBuilder.clientId("monolith-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-consumer")); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(cfAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueProducer> createToCalculatedFieldMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("monolith-calculated-field-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); + requestBuilder.admin(cfAdmin); + return requestBuilder.build(); + } + + @Override + public TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName()); + consumerBuilder.clientId("monolith-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId())); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(notificationAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueProducer> createToCalculatedFieldNotificationMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("monolith-calculated-field-notifications-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); + requestBuilder.admin(notificationAdmin); + return requestBuilder.build(); + } + + @Override + public TbQueueConsumer> createCalculatedFieldStateConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); + consumerBuilder.clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer")); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(cfAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueProducer> createCalculatedFieldStateProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("monolith-calculated-field-state-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); + requestBuilder.admin(cfAdmin); + return requestBuilder.build(); + } + @PreDestroy private void destroy() { if (coreAdmin != null) { @@ -522,5 +600,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi if (edgeAdmin != null) { edgeAdmin.destroy(); } + if (cfAdmin != null) { + cfAdmin.destroy(); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index cc0e044917..aceccf7e58 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg; @@ -53,6 +54,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueEdgeSettings; import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; @@ -79,6 +81,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueEdgeSettings edgeSettings; + private final TbQueueCalculatedFieldSettings calculatedFieldSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -107,6 +110,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueEdgeSettings edgeSettings, TbKafkaConsumerStatsService consumerStatsService, TbQueueTransportNotificationSettings transportNotificationSettings, + TbQueueCalculatedFieldSettings calculatedFieldSettings, TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; @@ -119,6 +123,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { this.consumerStatsService = consumerStatsService; this.transportNotificationSettings = transportNotificationSettings; this.edgeSettings = edgeSettings; + this.calculatedFieldSettings = calculatedFieldSettings; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -439,6 +444,16 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { return requestBuilder.build(); } + @Override + public TbQueueProducer> createToCalculatedFieldNotificationMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("tb-core-calculated-field-notifications-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); + requestBuilder.admin(notificationAdmin); + return requestBuilder.build(); + } + @PreDestroy private void destroy() { if (coreAdmin != null) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 87a1a69c2e..46b35f9acd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -23,6 +23,9 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg; @@ -49,6 +52,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueEdgeSettings; import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; @@ -71,6 +75,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbKafkaConsumerStatsService consumerStatsService; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueEdgeSettings edgeSettings; + private final TbQueueCalculatedFieldSettings calculatedFieldSettings; private final TbQueueAdmin coreAdmin; private final TbKafkaAdmin ruleEngineAdmin; @@ -81,6 +86,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueAdmin housekeeperAdmin; private final TbQueueAdmin edgeAdmin; private final TbQueueAdmin edgeEventAdmin; + private final TbQueueAdmin cfAdmin; private final AtomicLong consumerCount = new AtomicLong(); public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, @@ -90,7 +96,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbKafkaConsumerStatsService consumerStatsService, TbQueueTransportNotificationSettings transportNotificationSettings, - TbQueueEdgeSettings edgeSettings, + TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings, TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; @@ -101,6 +107,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.consumerStatsService = consumerStatsService; this.transportNotificationSettings = transportNotificationSettings; this.edgeSettings = edgeSettings; + this.calculatedFieldSettings = calculatedFieldSettings; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -111,6 +118,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs()); this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); + this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs()); } @Override @@ -293,6 +301,65 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { .build(); } + @Override + public TbQueueConsumer> createToCalculatedFieldMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); + consumerBuilder.clientId("tb-rule-engine-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-consumer")); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(cfAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueProducer> createToCalculatedFieldMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("tb-rule-engine-to-calculated-field-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); + requestBuilder.admin(cfAdmin); + return requestBuilder.build(); + } + + @Override + public TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName()); + consumerBuilder.clientId("tb-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId(topicService.buildTopicName("tb-calculated-field-notifications-node-") + serviceInfoProvider.getServiceId()); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(notificationAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueConsumer> createCalculatedFieldStateConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic())); + consumerBuilder.clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer")); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(cfAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueProducer> createCalculatedFieldStateProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("tb-rule-engine-to-calculated-field-state-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic())); + requestBuilder.admin(cfAdmin); + return requestBuilder.build(); + } + @PreDestroy private void destroy() { if (coreAdmin != null) { @@ -313,5 +380,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { if (fwUpdatesAdmin != null) { fwUpdatesAdmin.destroy(); } + if (cfAdmin != null) { + cfAdmin.destroy(); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java index 0b3df5bccf..673ac3a434 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2024 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 76dad05393..d3c1d09399 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2024 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java index 9307b9e9be..bdb675cbb7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java @@ -83,7 +83,7 @@ public class EventInsertRepository { " (id, tenant_id, ts, entity_id, service_id, e_message, e_error) " + "VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;"); insertStmtMap.put(EventType.DEBUG_CALCULATED_FIELD, "INSERT INTO " + EventType.DEBUG_CALCULATED_FIELD.getTable() + - " (id, tenant_id, tsб entity_id, service_id, cf_id, e_entity_id, e_entity_type, e_msg_id, e_msg_type, e_args, e_result, e_error) " + + " (id, tenant_id, ts, entity_id, service_id, cf_id, e_entity_id, e_entity_type, e_msg_id, e_msg_type, e_args, e_result, e_error) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;"); }