allow to configure kafkajs connectionTimeout
This commit is contained in:
		
							parent
							
								
									c7babf2085
								
							
						
					
					
						commit
						198f3ca21d
					
				@ -32,6 +32,7 @@ kafka:
 | 
			
		||||
  linger_ms: "TB_KAFKA_LINGER_MS" # for producer
 | 
			
		||||
  partitions_consumed_concurrently: "TB_KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
 | 
			
		||||
  requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS"
 | 
			
		||||
  connectionTimeout: "TB_KAFKA_CONNECTION_TIMEOUT_MS"
 | 
			
		||||
  compression: "TB_QUEUE_KAFKA_COMPRESSION" # gzip or uncompressed
 | 
			
		||||
  topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
 | 
			
		||||
  use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD"
 | 
			
		||||
 | 
			
		||||
@ -32,6 +32,7 @@ kafka:
 | 
			
		||||
  linger_ms: "5" # for producer
 | 
			
		||||
  partitions_consumed_concurrently: "1" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
 | 
			
		||||
  requestTimeout: "30000" # The default value in kafkajs is: 30000
 | 
			
		||||
  connectionTimeout: "1000" # The default value in kafkajs is: 1000
 | 
			
		||||
  compression: "gzip" # gzip or uncompressed
 | 
			
		||||
  topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1"
 | 
			
		||||
  use_confluent_cloud: false
 | 
			
		||||
 | 
			
		||||
@ -42,6 +42,7 @@ export class KafkaTemplate implements IQueue {
 | 
			
		||||
    private maxBatchSize = Number(config.get('kafka.batch_size'));
 | 
			
		||||
    private linger = Number(config.get('kafka.linger_ms'));
 | 
			
		||||
    private requestTimeout = Number(config.get('kafka.requestTimeout'));
 | 
			
		||||
    private connectionTimeout = Number(config.get('kafka.connection_timeout_ms'));
 | 
			
		||||
    private compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None;
 | 
			
		||||
    private partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently'));
 | 
			
		||||
 | 
			
		||||
@ -80,6 +81,8 @@ export class KafkaTemplate implements IQueue {
 | 
			
		||||
 | 
			
		||||
        kafkaConfig['requestTimeout'] = this.requestTimeout;
 | 
			
		||||
 | 
			
		||||
        kafkaConfig['connectionTimeout'] = this.connectionTimeout;
 | 
			
		||||
 | 
			
		||||
        if (useConfluent) {
 | 
			
		||||
            kafkaConfig['sasl'] = {
 | 
			
		||||
                mechanism: config.get('kafka.confluent.sasl.mechanism') as any,
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user