Auto offset reset config for Kafka
This commit is contained in:
		
							parent
							
								
									55775f2815
								
							
						
					
					
						commit
						137cc18099
					
				@ -1036,6 +1036,7 @@ queue:
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
 | 
			
		||||
@ -115,6 +115,9 @@ public class TbKafkaSettings {
 | 
			
		||||
    @Value("${queue.kafka.session.timeout.ms:10000}")
 | 
			
		||||
    private int sessionTimeoutMs;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.auto_offset_reset:earliest}")
 | 
			
		||||
    private String autoOffsetReset;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.use_confluent_cloud:false}")
 | 
			
		||||
    private boolean useConfluent;
 | 
			
		||||
 | 
			
		||||
@ -155,6 +158,7 @@ public class TbKafkaSettings {
 | 
			
		||||
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
 | 
			
		||||
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
 | 
			
		||||
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
 | 
			
		||||
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
 | 
			
		||||
 | 
			
		||||
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 | 
			
		||||
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 | 
			
		||||
 | 
			
		||||
@ -72,6 +72,7 @@ queue:
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
 | 
			
		||||
@ -173,6 +173,7 @@ queue:
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
 | 
			
		||||
@ -158,6 +158,7 @@ queue:
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
 | 
			
		||||
@ -239,6 +239,7 @@ queue:
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
 | 
			
		||||
@ -188,6 +188,7 @@ queue:
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
 | 
			
		||||
@ -134,6 +134,7 @@ queue:
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user