From 4de258f2aed48e4e6df6191a0b2bfa6c71810d2a Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 29 Dec 2020 20:03:33 +0200 Subject: [PATCH 1/2] removed ServiceId from kafka consumer groupId --- .../queue/provider/KafkaMonolithQueueFactory.java | 12 ++++++------ .../queue/provider/KafkaTbCoreQueueFactory.java | 8 ++++---- .../provider/KafkaTbRuleEngineQueueFactory.java | 6 +++--- .../queue/provider/KafkaTbTransportQueueFactory.java | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 538d91aa0f..68a3922de0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -153,7 +153,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(ruleEngineSettings.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); return consumerBuilder.build(); @@ -165,7 +165,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("monolith-rule-engine-notifications-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -177,7 +177,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getTopic()); consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-core-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("monolith-core-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); @@ -189,7 +189,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("monolith-core-notifications-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -230,7 +230,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId("rule-engine-node"); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); @@ -256,7 +256,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getUsageStatsTopic()); consumerBuilder.clientId("monolith-us-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-us-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("monolith-us-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 5e1b398e91..29fbfd2ae6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -147,7 +147,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getTopic()); consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-core-node-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("tb-core-node"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); @@ -159,7 +159,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("tb-core-notifications-node"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -200,7 +200,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId("rule-engine-node"); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); @@ -226,7 +226,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(coreSettings.getUsageStatsTopic()); consumerBuilder.clientId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-core-us-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("tb-core-us-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(coreAdmin); return consumerBuilder.build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 118f0f3316..5df727f06f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -142,7 +142,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(ruleEngineSettings.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); return consumerBuilder.build(); @@ -154,7 +154,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("tb-rule-engine-notifications-node"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -173,7 +173,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId("rule-engine-node"); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java index 7961bb97bb..f3c294d5b1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java @@ -92,7 +92,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId("transport-node"); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); responseBuilder.admin(transportApiAdmin); @@ -133,7 +133,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId()); + responseBuilder.groupId("transport-node"); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); responseBuilder.admin(notificationAdmin); return responseBuilder.build(); From 1b905ef7270ca73e328e7d7ed936c489a0c69d3a Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 30 Dec 2020 15:13:23 +0200 Subject: [PATCH 2/2] refactored --- .../server/queue/provider/KafkaMonolithQueueFactory.java | 6 +++--- .../server/queue/provider/KafkaTbCoreQueueFactory.java | 4 ++-- .../queue/provider/KafkaTbRuleEngineQueueFactory.java | 4 ++-- .../server/queue/provider/KafkaTbTransportQueueFactory.java | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 68a3922de0..a138f8c40f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -165,7 +165,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-rule-engine-notifications-consumer"); + consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -189,7 +189,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("monolith-core-notifications-consumer"); + consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -230,7 +230,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node"); + responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 29fbfd2ae6..caaa727749 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -159,7 +159,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-core-notifications-node"); + consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -200,7 +200,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node"); + responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 5df727f06f..3bfbf751c5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -154,7 +154,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); - consumerBuilder.groupId("tb-rule-engine-notifications-node"); + consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId()); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(notificationAdmin); return consumerBuilder.build(); @@ -173,7 +173,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("rule-engine-node"); + responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java index f3c294d5b1..7961bb97bb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbTransportQueueFactory.java @@ -92,7 +92,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("transport-node"); + responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); responseBuilder.admin(transportApiAdmin); @@ -133,7 +133,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { responseBuilder.settings(kafkaSettings); responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId()); responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId()); - responseBuilder.groupId("transport-node"); + responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId()); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); responseBuilder.admin(notificationAdmin); return responseBuilder.build();