Merge pull request #13148 from AndriiLandiak/fix-autocommit-cf

Make Kafka consumer properties configurable for overridden topic names
This commit is contained in:
Viacheslav Klimov 2025-05-06 15:46:53 +03:00 committed by GitHub
commit 825f0b1ed7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 118 additions and 7 deletions

View File

@ -1631,6 +1631,12 @@ queue:
- key: max.poll.records
# Max poll records for edqs.state topic
value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Each entry sets a single property for a specific topic. To define multiple properties for a topic, repeat the topic key.
# Format: "topic1:key=value;topic1:key=value;topic2:key=value"
# Example: tb_core_updated:max.poll.records=10;tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;tb_edge_updated:auto.offset.reset=latest
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms

View File

@ -15,14 +15,16 @@
*/
package org.thingsboard.server.common.data;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Created by ashvayka on 25.09.18.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TbProperty {
private String key;
private String value;
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
@ -36,7 +37,8 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.TbProperty;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -138,15 +140,26 @@ public class TbKafkaSettings {
@Value("${queue.kafka.other-inline:}")
private String otherInline;
@Value("${queue.kafka.consumer-properties-per-topic-inline:}")
private String consumerPropertiesPerTopicInline;
@Deprecated
@Setter
private List<TbProperty> other;
@Setter
private Map<String, List<TbProperty>> consumerPropertiesPerTopic = Collections.emptyMap();
private Map<String, List<TbProperty>> consumerPropertiesPerTopic = new HashMap<>();
private volatile AdminClient adminClient;
@PostConstruct
public void initInlineTopicProperties() {
Map<String, List<TbProperty>> inlineProps = parseTopicPropertyList(consumerPropertiesPerTopicInline);
if (!inlineProps.isEmpty()) {
consumerPropertiesPerTopic.putAll(inlineProps);
}
}
public Properties toConsumerProps(String topic) {
Properties props = toProps();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
@ -245,6 +258,27 @@ public class TbKafkaSettings {
return props;
}
private Map<String, List<TbProperty>> parseTopicPropertyList(String inlineProperties) {
Map<String, List<String>> grouped = PropertyUtils.getGroupedProps(inlineProperties);
Map<String, List<TbProperty>> result = new HashMap<>();
grouped.forEach((topic, entries) -> {
Map<String, String> merged = new LinkedHashMap<>();
for (String entry : entries) {
String[] kv = entry.split("=", 2);
if (kv.length == 2) {
merged.put(kv[0].trim(), kv[1].trim());
}
}
List<TbProperty> props = merged.entrySet().stream()
.map(e -> new TbProperty(e.getKey(), e.getValue()))
.toList();
result.put(topic, props);
});
return result;
}
@PreDestroy
private void destroy() {
if (adminClient != null) {

View File

@ -17,7 +17,9 @@ package org.thingsboard.server.queue.util;
import org.thingsboard.server.common.data.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -38,6 +40,21 @@ public class PropertyUtils {
return configs;
}
public static Map<String, List<String>> getGroupedProps(String properties) {
Map<String, List<String>> configs = new HashMap<>();
if (StringUtils.isNotEmpty(properties)) {
for (String property : properties.split(";")) {
if (StringUtils.isNotEmpty(property)) {
int delimiterPosition = property.indexOf(":");
String topic = property.substring(0, delimiterPosition).trim();
String value = property.substring(delimiterPosition + 1).trim();
configs.computeIfAbsent(topic, k -> new ArrayList<>()).add(value);
}
}
}
return configs;
}
public static Map<String, String> getProps(Map<String, String> defaultProperties, String propertiesStr) {
return getProps(defaultProperties, propertiesStr, PropertyUtils::getProps);
}

View File

@ -33,6 +33,12 @@ import static org.mockito.Mockito.spy;
"queue.type=kafka",
"queue.kafka.bootstrap.servers=localhost:9092",
"queue.kafka.other-inline=metrics.recording.level:INFO;metrics.sample.window.ms:30000",
"queue.kafka.consumer-properties-per-topic-inline=" +
"tb_core_updated:max.poll.records=10;" +
"tb_core_updated:enable.auto.commit=true;" +
"tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;" +
"tb_edge_updated:max.poll.records=5;" +
"tb_edge_updated:auto.offset.reset=latest"
})
class TbKafkaSettingsTest {
@ -79,4 +85,16 @@ class TbKafkaSettingsTest {
Mockito.verify(settings).configureSSL(any());
}
}
@Test
void givenMultipleTopicsInInlineConfig_whenParsed_thenEachTopicGetsExpectedProperties() {
Properties coreProps = settings.toConsumerProps("tb_core_updated");
assertThat(coreProps.getProperty("max.poll.records")).isEqualTo("10");
assertThat(coreProps.getProperty("enable.auto.commit")).isEqualTo("true");
assertThat(coreProps.getProperty("bootstrap.servers")).isEqualTo("kafka1:9092,kafka2:9092");
Properties edgeProps = settings.toConsumerProps("tb_edge_updated");
assertThat(edgeProps.getProperty("max.poll.records")).isEqualTo("5");
assertThat(edgeProps.getProperty("auto.offset.reset")).isEqualTo("latest");
}
}

View File

@ -148,7 +148,11 @@ queue:
- key: max.poll.records
# Max poll records for edqs.state topic
value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms

View File

@ -124,6 +124,11 @@ queue:
# tb_rule_engine.sq:
# - key: max.poll.records
# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms

View File

@ -310,6 +310,11 @@ queue:
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
# Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms

View File

@ -259,6 +259,11 @@ queue:
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
# Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms

View File

@ -360,6 +360,11 @@ queue:
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
# Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms

View File

@ -293,6 +293,11 @@ queue:
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
# Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms

View File

@ -239,6 +239,11 @@ queue:
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
# Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms