diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 163c325f52..96ee2793ed 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1036,6 +1036,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 41ae0d5ea3..d2a54128e3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -115,6 +115,9 @@ public class TbKafkaSettings { @Value("${queue.kafka.session.timeout.ms:10000}") private int sessionTimeoutMs; + @Value("${queue.kafka.auto_offset_reset:earliest}") + private String autoOffsetReset; + @Value("${queue.kafka.use_confluent_cloud:false}") private boolean useConfluent; @@ -155,6 +158,7 @@ public class TbKafkaSettings { props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 352f94e091..9aa149280a 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -72,6 +72,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 7ea553fe5c..5e0a7f2a9d 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -173,6 +173,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 346ec48eae..f7af5c979a 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -158,6 +158,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 4e8167d89d..3d649b3c2b 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -239,6 +239,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 1e0b1ebcd4..38f569cfd7 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -188,6 +188,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 9f086bcbc5..c4d76dbf30 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -134,6 +134,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"