Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Igor Kulikov 2020-12-30 17:13:14 +02:00
commit d7fa77438f
3 changed files with 6 additions and 6 deletions

View File

@ -153,7 +153,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
return consumerBuilder.build();
@ -177,7 +177,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getTopic());
consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-core-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();
@ -256,7 +256,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getUsageStatsTopic());
consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-us-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();

View File

@ -147,7 +147,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getTopic());
consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-node-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-node");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();
@ -226,7 +226,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(coreSettings.getUsageStatsTopic());
consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-us-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);
return consumerBuilder.build();

View File

@ -142,7 +142,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
return consumerBuilder.build();