added impl for cf notification producer

This commit is contained in:
IrynaMatveieva 2025-03-19 15:07:48 +02:00
parent 5a5d1e2710
commit f88be5b863
6 changed files with 18 additions and 5 deletions

View File

@ -214,7 +214,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
@Override
protected TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createNotificationsConsumer() {
return queueFactory.createToCalculatedFieldNotificationsMsgConsumer();
return queueFactory.createToCalculatedFieldNotificationMsgConsumer();
}
@Override

View File

@ -144,7 +144,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgConsumer() {
return new InMemoryTbQueueConsumer<>(storage, topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());
}

View File

@ -537,7 +537,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());

View File

@ -332,7 +332,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer() {
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());
@ -344,6 +344,16 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
return consumerBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-calculated-field-notifications-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(topicService.getCalculatedFieldNotificationsTopic(serviceInfoProvider.getServiceId()).getFullTopicName());
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<CalculatedFieldStateProto>>builder()

View File

@ -69,6 +69,7 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
this.toEdgeNotifications = tbQueueProvider.createEdgeNotificationsMsgProducer();
this.toEdgeEvents = tbQueueProvider.createEdgeEventMsgProducer();
this.toCalculatedFields = tbQueueProvider.createToCalculatedFieldMsgProducer();
this.toCalculatedFieldNotifications = tbQueueProvider.createToCalculatedFieldNotificationMsgProducer();
}
@Override

View File

@ -124,7 +124,9 @@ public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory
TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer();
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationsMsgConsumer();
TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgConsumer();
TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgProducer();
TbQueueConsumer<TbProtoQueueMsg<CalculatedFieldStateProto>> createCalculatedFieldStateConsumer();