diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 2ebea4ccc1..09f4fb0788 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -32,6 +32,7 @@ kafka: linger_ms: "TB_KAFKA_LINGER_MS" # for producer partitions_consumed_concurrently: "TB_KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS" + connectionTimeout: "TB_KAFKA_CONNECTION_TIMEOUT_MS" compression: "TB_QUEUE_KAFKA_COMPRESSION" # gzip or uncompressed topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 96f3401da5..66a153bad4 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -32,6 +32,7 @@ kafka: linger_ms: "5" # for producer partitions_consumed_concurrently: "1" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency requestTimeout: "30000" # The default value in kafkajs is: 30000 + connectionTimeout: "1000" # The default value in kafkajs is: 1000 compression: "gzip" # gzip or uncompressed topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1" use_confluent_cloud: false diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 6cb8281837..9b089dbb57 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -42,6 +42,7 @@ export class KafkaTemplate implements IQueue { private maxBatchSize = Number(config.get('kafka.batch_size')); private linger = Number(config.get('kafka.linger_ms')); private requestTimeout = Number(config.get('kafka.requestTimeout')); + private connectionTimeout = Number(config.get('kafka.connection_timeout_ms')); private compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None; private partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently')); @@ -80,6 +81,8 @@ export class KafkaTemplate implements IQueue { kafkaConfig['requestTimeout'] = this.requestTimeout; + kafkaConfig['connectionTimeout'] = this.connectionTimeout; + if (useConfluent) { kafkaConfig['sasl'] = { mechanism: config.get('kafka.confluent.sasl.mechanism') as any,