diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
index 85ee01ac5f..ff4634f957 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java
index 387bdd7143..bba4b1e35b 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCalculatedFieldConsumerService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2024 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.queue;
import org.springframework.context.ApplicationListener;
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 94c1bef71c..fdd9923292 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -1619,6 +1619,8 @@ queue:
edge: "${TB_QUEUE_KAFKA_EDGE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for Edge event topic
edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
+ # Kafka properties for Calculated Field topics
+ calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java
index ee529e8a68..cdd0add38b 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java
@@ -52,6 +52,8 @@ public class TbKafkaTopicConfigs {
private String housekeeperProperties;
@Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}")
private String housekeeperReprocessingProperties;
+ @Value("${queue.kafka.topic-properties.calculated-field:}")
+ private String calculatedFieldProperties;
@Getter
private Map coreConfigs;
@@ -79,6 +81,8 @@ public class TbKafkaTopicConfigs {
private Map edgeConfigs;
@Getter
private Map edgeEventConfigs;
+ @Getter
+ private Map calculatedFieldConfigs;
@PostConstruct
private void init() {
@@ -97,6 +101,7 @@ public class TbKafkaTopicConfigs {
housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties);
edgeConfigs = PropertyUtils.getProps(edgeProperties);
edgeEventConfigs = PropertyUtils.getProps(edgeEventProperties);
+ calculatedFieldConfigs = PropertyUtils.getProps(calculatedFieldProperties);
}
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
index dd5d61e834..01e174023c 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
@@ -25,6 +25,9 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
@@ -54,6 +57,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
+import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
@@ -79,6 +83,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueEdgeSettings edgeSettings;
+ private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueAdmin coreAdmin;
@@ -94,6 +99,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueAdmin housekeeperReprocessingAdmin;
private final TbQueueAdmin edgeAdmin;
private final TbQueueAdmin edgeEventAdmin;
+ private final TbQueueAdmin cfAdmin;
private final AtomicLong consumerCount = new AtomicLong();
@@ -106,6 +112,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueVersionControlSettings vcSettings,
TbQueueEdgeSettings edgeSettings,
+ TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
@@ -119,6 +126,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.vcSettings = vcSettings;
this.consumerStatsService = consumerStatsService;
this.edgeSettings = edgeSettings;
+ this.calculatedFieldSettings = calculatedFieldSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@@ -133,6 +141,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
+ this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
}
@Override
@@ -490,6 +499,75 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
return requestBuilder.build();
}
+ @Override
+ public TbQueueConsumer> createToCalculatedFieldMsgConsumer() {
+ TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder();
+ consumerBuilder.settings(kafkaSettings);
+ consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
+ consumerBuilder.clientId("monolith-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
+ consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-consumer"));
+ consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ consumerBuilder.admin(cfAdmin);
+ consumerBuilder.statsService(consumerStatsService);
+ return consumerBuilder.build();
+ }
+
+ @Override
+ public TbQueueProducer> createToCalculatedFieldMsgProducer() {
+ TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.clientId("monolith-calculated-field-" + serviceInfoProvider.getServiceId());
+ requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
+ requestBuilder.admin(cfAdmin);
+ return requestBuilder.build();
+ }
+
+ @Override
+ public TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer() {
+ TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder();
+ consumerBuilder.settings(kafkaSettings);
+ consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());
+ consumerBuilder.clientId("monolith-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId());
+ consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId()));
+ consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ consumerBuilder.admin(notificationAdmin);
+ consumerBuilder.statsService(consumerStatsService);
+ return consumerBuilder.build();
+ }
+
+ @Override
+ public TbQueueProducer> createToCalculatedFieldNotificationMsgProducer() {
+ TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.clientId("monolith-calculated-field-notifications-" + serviceInfoProvider.getServiceId());
+ requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
+ requestBuilder.admin(notificationAdmin);
+ return requestBuilder.build();
+ }
+
+ @Override
+ public TbQueueConsumer> createCalculatedFieldStateConsumer() {
+ TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder();
+ consumerBuilder.settings(kafkaSettings);
+ consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
+ consumerBuilder.clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
+ consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer"));
+ consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()));
+ consumerBuilder.admin(cfAdmin);
+ consumerBuilder.statsService(consumerStatsService);
+ return consumerBuilder.build();
+ }
+
+ @Override
+ public TbQueueProducer> createCalculatedFieldStateProducer() {
+ TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.clientId("monolith-calculated-field-state-" + serviceInfoProvider.getServiceId());
+ requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
+ requestBuilder.admin(cfAdmin);
+ return requestBuilder.build();
+ }
+
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
@@ -522,5 +600,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
if (edgeAdmin != null) {
edgeAdmin.destroy();
}
+ if (cfAdmin != null) {
+ cfAdmin.destroy();
+ }
}
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
index cc0e044917..aceccf7e58 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
@@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
@@ -53,6 +54,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
+import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
@@ -79,6 +81,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueEdgeSettings edgeSettings;
+ private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@@ -107,6 +110,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbQueueEdgeSettings edgeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
+ TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
@@ -119,6 +123,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.edgeSettings = edgeSettings;
+ this.calculatedFieldSettings = calculatedFieldSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@@ -439,6 +444,16 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
return requestBuilder.build();
}
+ @Override
+ public TbQueueProducer> createToCalculatedFieldNotificationMsgProducer() {
+ TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.clientId("tb-core-calculated-field-notifications-" + serviceInfoProvider.getServiceId());
+ requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
+ requestBuilder.admin(notificationAdmin);
+ return requestBuilder.build();
+ }
+
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
index 87a1a69c2e..46b35f9acd 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
@@ -23,6 +23,9 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
@@ -49,6 +52,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
+import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
@@ -71,6 +75,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueEdgeSettings edgeSettings;
+ private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final TbQueueAdmin coreAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
@@ -81,6 +86,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueAdmin housekeeperAdmin;
private final TbQueueAdmin edgeAdmin;
private final TbQueueAdmin edgeEventAdmin;
+ private final TbQueueAdmin cfAdmin;
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
@@ -90,7 +96,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
- TbQueueEdgeSettings edgeSettings,
+ TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
@@ -101,6 +107,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.edgeSettings = edgeSettings;
+ this.calculatedFieldSettings = calculatedFieldSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@@ -111,6 +118,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
+ this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
}
@Override
@@ -293,6 +301,65 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
.build();
}
+ @Override
+ public TbQueueConsumer> createToCalculatedFieldMsgConsumer() {
+ TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder();
+ consumerBuilder.settings(kafkaSettings);
+ consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
+ consumerBuilder.clientId("tb-rule-engine-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
+ consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-consumer"));
+ consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ consumerBuilder.admin(cfAdmin);
+ consumerBuilder.statsService(consumerStatsService);
+ return consumerBuilder.build();
+ }
+
+ @Override
+ public TbQueueProducer> createToCalculatedFieldMsgProducer() {
+ TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.clientId("tb-rule-engine-to-calculated-field-" + serviceInfoProvider.getServiceId());
+ requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
+ requestBuilder.admin(cfAdmin);
+ return requestBuilder.build();
+ }
+
+ @Override
+ public TbQueueConsumer> createToCalculatedFieldNotificationsMsgConsumer() {
+ TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder();
+ consumerBuilder.settings(kafkaSettings);
+ consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());
+ consumerBuilder.clientId("tb-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId());
+ consumerBuilder.groupId(topicService.buildTopicName("tb-calculated-field-notifications-node-") + serviceInfoProvider.getServiceId());
+ consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
+ consumerBuilder.admin(notificationAdmin);
+ consumerBuilder.statsService(consumerStatsService);
+ return consumerBuilder.build();
+ }
+
+ @Override
+ public TbQueueConsumer> createCalculatedFieldStateConsumer() {
+ TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder();
+ consumerBuilder.settings(kafkaSettings);
+ consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
+ consumerBuilder.clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
+ consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer"));
+ consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()));
+ consumerBuilder.admin(cfAdmin);
+ consumerBuilder.statsService(consumerStatsService);
+ return consumerBuilder.build();
+ }
+
+ @Override
+ public TbQueueProducer> createCalculatedFieldStateProducer() {
+ TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.clientId("tb-rule-engine-to-calculated-field-state-" + serviceInfoProvider.getServiceId());
+ requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
+ requestBuilder.admin(cfAdmin);
+ return requestBuilder.build();
+ }
+
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
@@ -313,5 +380,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
+ if (cfAdmin != null) {
+ cfAdmin.destroy();
+ }
}
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java
index 0b3df5bccf..673ac3a434 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java
index 76dad05393..d3c1d09399 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java
index 9307b9e9be..bdb675cbb7 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/EventInsertRepository.java
@@ -83,7 +83,7 @@ public class EventInsertRepository {
" (id, tenant_id, ts, entity_id, service_id, e_message, e_error) " +
"VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;");
insertStmtMap.put(EventType.DEBUG_CALCULATED_FIELD, "INSERT INTO " + EventType.DEBUG_CALCULATED_FIELD.getTable() +
- " (id, tenant_id, tsб entity_id, service_id, cf_id, e_entity_id, e_entity_type, e_msg_id, e_msg_type, e_args, e_result, e_error) " +
+ " (id, tenant_id, ts, entity_id, service_id, cf_id, e_entity_id, e_entity_type, e_msg_id, e_msg_type, e_args, e_result, e_error) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;");
}