diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 538d91aa0f..a138f8c40f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -153,7 +153,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(ruleEngineSettings.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); return consumerBuilder.build(); @@ -177,7 +177,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getTopic()); consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-core-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("monolith-core-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); @@ -256,7 +256,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getUsageStatsTopic()); consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-us-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("monolith-us-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 5e1b398e91..caaa727749 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -147,7 +147,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getTopic()); consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-core-node-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("tb-core-node"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); @@ -226,7 +226,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getUsageStatsTopic()); consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("tb-core-us-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 118f0f3316..3bfbf751c5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -142,7 +142,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(ruleEngineSettings.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); return consumerBuilder.build();