Introduced separate count for edge consumers

This commit is contained in:
Volodymyr Babak 2025-02-10 18:51:13 +02:00
parent 3860d79613
commit 1b4930ebaf
2 changed files with 4 additions and 2 deletions

View File

@ -96,6 +96,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueAdmin edgeEventAdmin; private final TbQueueAdmin edgeEventAdmin;
private final AtomicLong consumerCount = new AtomicLong(); private final AtomicLong consumerCount = new AtomicLong();
private final AtomicLong edgeConsumerCount = new AtomicLong();
public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
TbServiceInfoProvider serviceInfoProvider, TbServiceInfoProvider serviceInfoProvider,
@ -472,7 +473,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings); consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer")); consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(edgeEventAdmin); consumerBuilder.admin(edgeEventAdmin);

View File

@ -95,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueAdmin edgeEventAdmin; private final TbQueueAdmin edgeEventAdmin;
private final AtomicLong consumerCount = new AtomicLong(); private final AtomicLong consumerCount = new AtomicLong();
private final AtomicLong edgeConsumerCount = new AtomicLong();
public KafkaTbCoreQueueFactory(TopicService topicService, public KafkaTbCoreQueueFactory(TopicService topicService,
TbKafkaSettings kafkaSettings, TbKafkaSettings kafkaSettings,
@ -421,7 +422,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings); consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer")); consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(edgeEventAdmin); consumerBuilder.admin(edgeEventAdmin);