added implementations for consumer/producer methods

This commit is contained in:
IrynaMatveieva 2025-01-23 15:49:31 +02:00
parent fdada09ea2
commit 2cc0d6f513
10 changed files with 202 additions and 14 deletions

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -1,3 +1,18 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import org.springframework.context.ApplicationListener;

View File

@ -1619,6 +1619,8 @@ queue:
edge: "${TB_QUEUE_KAFKA_EDGE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for Edge event topic
edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for Calculated Field topics
calculated-field: "${TB_QUEUE_KAFKA_CF_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"

View File

@ -52,6 +52,8 @@ public class TbKafkaTopicConfigs {
private String housekeeperProperties;
@Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}")
private String housekeeperReprocessingProperties;
@Value("${queue.kafka.topic-properties.calculated-field:}")
private String calculatedFieldProperties;
@Getter
private Map<String, String> coreConfigs;
@ -79,6 +81,8 @@ public class TbKafkaTopicConfigs {
private Map<String, String> edgeConfigs;
@Getter
private Map<String, String> edgeEventConfigs;
@Getter
private Map<String, String> calculatedFieldConfigs;
@PostConstruct
private void init() {
@ -97,6 +101,7 @@ public class TbKafkaTopicConfigs {
housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties);
edgeConfigs = PropertyUtils.getProps(edgeProperties);
edgeEventConfigs = PropertyUtils.getProps(edgeEventProperties);
calculatedFieldConfigs = PropertyUtils.getProps(calculatedFieldProperties);
}
}

View File

@ -25,6 +25,9 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
@ -54,6 +57,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
@ -79,6 +83,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueVersionControlSettings vcSettings;
private final TbQueueEdgeSettings edgeSettings;
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueAdmin coreAdmin;
@ -94,6 +99,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueAdmin housekeeperReprocessingAdmin;
private final TbQueueAdmin edgeAdmin;
private final TbQueueAdmin edgeEventAdmin;
private final TbQueueAdmin cfAdmin;
private final AtomicLong consumerCount = new AtomicLong();
@ -106,6 +112,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueVersionControlSettings vcSettings,
TbQueueEdgeSettings edgeSettings,
TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
@ -119,6 +126,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.vcSettings = vcSettings;
this.consumerStatsService = consumerStatsService;
this.edgeSettings = edgeSettings;
this.calculatedFieldSettings = calculatedFieldSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -133,6 +141,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
}
@Override
@ -490,6 +499,75 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
return requestBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
consumerBuilder.clientId("monolith-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(cfAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-calculated-field-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
requestBuilder.admin(cfAdmin);
return requestBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());
consumerBuilder.clientId("monolith-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId()));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(notificationAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-calculated-field-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
consumerBuilder.clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(cfAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-calculated-field-state-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
requestBuilder.admin(cfAdmin);
return requestBuilder.build();
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
@ -522,5 +600,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
if (edgeAdmin != null) {
edgeAdmin.destroy();
}
if (cfAdmin != null) {
cfAdmin.destroy();
}
}
}

View File

@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
@ -53,6 +54,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
@ -79,6 +81,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueEdgeSettings edgeSettings;
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -107,6 +110,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbQueueEdgeSettings edgeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
@ -119,6 +123,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.edgeSettings = edgeSettings;
this.calculatedFieldSettings = calculatedFieldSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -439,6 +444,16 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
return requestBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-calculated-field-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {

View File

@ -23,6 +23,9 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
@ -49,6 +52,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
@ -71,6 +75,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueEdgeSettings edgeSettings;
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final TbQueueAdmin coreAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
@ -81,6 +86,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueAdmin housekeeperAdmin;
private final TbQueueAdmin edgeAdmin;
private final TbQueueAdmin edgeEventAdmin;
private final TbQueueAdmin cfAdmin;
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
@ -90,7 +96,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueEdgeSettings edgeSettings,
TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
@ -101,6 +107,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.edgeSettings = edgeSettings;
this.calculatedFieldSettings = calculatedFieldSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -111,6 +118,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
}
@Override
@ -293,6 +301,65 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
consumerBuilder.clientId("tb-rule-engine-calculated-field-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(cfAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-calculated-field-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
requestBuilder.admin(cfAdmin);
return requestBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());
consumerBuilder.clientId("tb-calculated-field-notifications-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId(topicService.buildTopicName("tb-calculated-field-notifications-node-") + serviceInfoProvider.getServiceId());
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCalculatedFieldNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(notificationAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(calculatedFieldSettings.getStateTopic()));
consumerBuilder.clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(cfAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<CalculatedFieldStateProto>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-calculated-field-state-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.buildTopicName(calculatedFieldSettings.getEventTopic()));
requestBuilder.admin(cfAdmin);
return requestBuilder.build();
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {
@ -313,5 +380,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
if (cfAdmin != null) {
cfAdmin.destroy();
}
}
}

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -83,7 +83,7 @@ public class EventInsertRepository {
" (id, tenant_id, ts, entity_id, service_id, e_message, e_error) " +
"VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;");
insertStmtMap.put(EventType.DEBUG_CALCULATED_FIELD, "INSERT INTO " + EventType.DEBUG_CALCULATED_FIELD.getTable() +
" (id, tenant_id, tsб entity_id, service_id, cf_id, e_entity_id, e_entity_type, e_msg_id, e_msg_type, e_args, e_result, e_error) " +
" (id, tenant_id, ts, entity_id, service_id, cf_id, e_entity_id, e_entity_type, e_msg_id, e_msg_type, e_args, e_result, e_error) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;");
}