diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index cd9d577b86..0767e5e303 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -96,6 +96,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin edgeEventAdmin; private final AtomicLong consumerCount = new AtomicLong(); + private final AtomicLong edgeConsumerCount = new AtomicLong(); public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, TbServiceInfoProvider serviceInfoProvider, @@ -472,7 +473,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); - consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet()); consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer")); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(edgeEventAdmin); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index dffb127c70..06fd6147a8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -95,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueAdmin edgeEventAdmin; private final AtomicLong consumerCount = new AtomicLong(); + private final AtomicLong edgeConsumerCount = new AtomicLong(); public KafkaTbCoreQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, @@ -421,7 +422,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); - consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); + consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet()); consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer")); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(edgeEventAdmin);