removed ServiceId from kafka consumer groupId
This commit is contained in:
parent
ccab1aa2c9
commit
1d09fa2bd8
@ -152,7 +152,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(ruleEngineSettings.getTopic());
|
consumerBuilder.topic(ruleEngineSettings.getTopic());
|
||||||
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("re-" + queueName + "-consumer");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(ruleEngineAdmin);
|
consumerBuilder.admin(ruleEngineAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -164,7 +164,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
||||||
consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("monolith-rule-engine-notifications-consumer");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(notificationAdmin);
|
consumerBuilder.admin(notificationAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -176,7 +176,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(coreSettings.getTopic());
|
consumerBuilder.topic(coreSettings.getTopic());
|
||||||
consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("monolith-core-consumer");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(coreAdmin);
|
consumerBuilder.admin(coreAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -188,7 +188,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
||||||
consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("monolith-core-notifications-consumer");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(notificationAdmin);
|
consumerBuilder.admin(notificationAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -229,7 +229,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
responseBuilder.settings(kafkaSettings);
|
responseBuilder.settings(kafkaSettings);
|
||||||
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
|
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
|
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
|
responseBuilder.groupId("rule-engine-node");
|
||||||
responseBuilder.decoder(msg -> {
|
responseBuilder.decoder(msg -> {
|
||||||
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
|
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
|
||||||
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
|
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
|
||||||
|
|||||||
@ -146,7 +146,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(coreSettings.getTopic());
|
consumerBuilder.topic(coreSettings.getTopic());
|
||||||
consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("tb-core-node-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("tb-core-node");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(coreAdmin);
|
consumerBuilder.admin(coreAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -158,7 +158,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
||||||
consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("tb-core-notifications-node");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(notificationAdmin);
|
consumerBuilder.admin(notificationAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -199,7 +199,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
responseBuilder.settings(kafkaSettings);
|
responseBuilder.settings(kafkaSettings);
|
||||||
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
|
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
|
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
|
responseBuilder.groupId("rule-engine-node");
|
||||||
responseBuilder.decoder(msg -> {
|
responseBuilder.decoder(msg -> {
|
||||||
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
|
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
|
||||||
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
|
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
|
||||||
|
|||||||
@ -141,7 +141,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(ruleEngineSettings.getTopic());
|
consumerBuilder.topic(ruleEngineSettings.getTopic());
|
||||||
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("re-" + queueName + "-consumer");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(ruleEngineAdmin);
|
consumerBuilder.admin(ruleEngineAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -153,7 +153,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
|||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
|
||||||
consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId());
|
consumerBuilder.groupId("tb-rule-engine-notifications-node");
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(notificationAdmin);
|
consumerBuilder.admin(notificationAdmin);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
@ -172,7 +172,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
|||||||
responseBuilder.settings(kafkaSettings);
|
responseBuilder.settings(kafkaSettings);
|
||||||
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
|
responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
|
responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
|
responseBuilder.groupId("rule-engine-node");
|
||||||
responseBuilder.decoder(msg -> {
|
responseBuilder.decoder(msg -> {
|
||||||
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
|
JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
|
||||||
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
|
JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
|
||||||
|
|||||||
@ -91,7 +91,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
|
|||||||
responseBuilder.settings(kafkaSettings);
|
responseBuilder.settings(kafkaSettings);
|
||||||
responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
|
responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
|
responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
|
responseBuilder.groupId("transport-node");
|
||||||
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
responseBuilder.admin(transportApiAdmin);
|
responseBuilder.admin(transportApiAdmin);
|
||||||
|
|
||||||
@ -132,7 +132,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
|
|||||||
responseBuilder.settings(kafkaSettings);
|
responseBuilder.settings(kafkaSettings);
|
||||||
responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId());
|
responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId());
|
responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId());
|
||||||
responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
|
responseBuilder.groupId("transport-node");
|
||||||
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
responseBuilder.admin(notificationAdmin);
|
responseBuilder.admin(notificationAdmin);
|
||||||
return responseBuilder.build();
|
return responseBuilder.build();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user