diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 848351217b..bbe567d138 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -27,6 +27,7 @@ kafka: replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" + client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh confluent: sasl: mechanism: "TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 8ada8f691b..69c73abf9c 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -27,6 +27,7 @@ kafka: replication_factor: "1" topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1" use_confluent_cloud: false + client_id: "kafkajs" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh confluent: sasl: mechanism: "PLAIN" diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index b0b4de5e81..ae1e6ae3ca 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -21,6 +21,7 @@ const config = require('config'), KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; const replicationFactor = config.get('kafka.replication_factor'); const topicProperties = config.get('kafka.topic_properties'); +const kafkaClientId = config.get('kafka.client_id'); let kafkaClient; let kafkaAdmin; @@ -62,6 +63,12 @@ function KafkaProducer() { logCreator: KafkaJsWinstonLogCreator }; + if (kafkaClientId) { + kafkaConfig['clientId'] = kafkaClientId; + } else { + logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); + } + if (useConfluent) { kafkaConfig['sasl'] = { mechanism: config.get('kafka.confluent.sasl.mechanism'),