diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ebd001aa97..e013473f98 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1630,9 +1630,10 @@ queue: value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}" # If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties # for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc). - # Format: "topic1:key1=value1,key2=value2;topic2:key=value" - # Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true" - consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}" + # Each entry sets a single property for a specific topic. To define multiple properties for a topic, repeat the topic key. + # Format: "topic1:key=value;topic1:key=value;topic2:key=value" + # Example: tb_core_updated:max.poll.records=10;tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;tb_edge_updated:auto.offset.reset=latest + consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:tb_core_updated:max.poll.records=10;tb_core_updated:enable.auto.commit=true;tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;tb_edge_updated:max.poll.records=5;tb_edge_updated:auto.offset.reset=latest}" other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index e0682f6364..06ccfe3a69 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -37,9 +37,8 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.TbProperty; import org.thingsboard.server.queue.util.PropertyUtils; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -149,7 +148,7 @@ public class TbKafkaSettings { private List other; @Setter - private Map> consumerPropertiesPerTopic = Collections.emptyMap(); + private Map> consumerPropertiesPerTopic = new HashMap<>(); private volatile AdminClient adminClient; @@ -260,23 +259,22 @@ public class TbKafkaSettings { } private Map> parseTopicPropertyList(String inlineProperties) { + Map> grouped = PropertyUtils.getGroupedProps(inlineProperties); Map> result = new HashMap<>(); - Map rawTopicToPropertyString = PropertyUtils.getProps(inlineProperties); - for (Map.Entry entry : rawTopicToPropertyString.entrySet()) { - String topic = entry.getKey().trim(); - String propertiesStr = entry.getValue(); - - List tbProperties = Arrays.stream(propertiesStr.split(",")) - .map(kv -> kv.split("=", 2)) - .filter(kvArr -> kvArr.length == 2) - .map(kvArr -> new TbProperty(kvArr[0].trim(), kvArr[1].trim())) - .toList(); - - if (!tbProperties.isEmpty()) { - result.put(topic, tbProperties); + grouped.forEach((topic, entries) -> { + Map merged = new LinkedHashMap<>(); + for (String entry : entries) { + String[] kv = entry.split("=", 2); + if (kv.length == 2) { + merged.put(kv[0].trim(), kv[1].trim()); + } } - } + List props = merged.entrySet().stream() + .map(e -> new TbProperty(e.getKey(), e.getValue())) + .toList(); + result.put(topic, props); + }); return result; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java index 6030eb278d..629ee29f6f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java @@ -17,7 +17,9 @@ package org.thingsboard.server.queue.util; import org.thingsboard.server.common.data.StringUtils; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -38,6 +40,21 @@ public class PropertyUtils { return configs; } + public static Map> getGroupedProps(String properties) { + Map> configs = new HashMap<>(); + if (StringUtils.isNotEmpty(properties)) { + for (String property : properties.split(";")) { + if (StringUtils.isNotEmpty(property)) { + int delimiterPosition = property.indexOf(":"); + String topic = property.substring(0, delimiterPosition).trim(); + String value = property.substring(delimiterPosition + 1).trim(); + configs.computeIfAbsent(topic, k -> new ArrayList<>()).add(value); + } + } + } + return configs; + } + public static Map getProps(Map defaultProperties, String propertiesStr) { return getProps(defaultProperties, propertiesStr, PropertyUtils::getProps); } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java index 5cdebc3996..ad026c63aa 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java @@ -33,6 +33,12 @@ import static org.mockito.Mockito.spy; "queue.type=kafka", "queue.kafka.bootstrap.servers=localhost:9092", "queue.kafka.other-inline=metrics.recording.level:INFO;metrics.sample.window.ms:30000", + "queue.kafka.consumer-properties-per-topic-inline=" + + "tb_core_updated:max.poll.records=10;" + + "tb_core_updated:enable.auto.commit=true;" + + "tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;" + + "tb_edge_updated:max.poll.records=5;" + + "tb_edge_updated:auto.offset.reset=latest" }) class TbKafkaSettingsTest { @@ -79,4 +85,16 @@ class TbKafkaSettingsTest { Mockito.verify(settings).configureSSL(any()); } -} \ No newline at end of file + @Test + void givenMultipleTopicsInInlineConfig_whenParsed_thenEachTopicGetsExpectedProperties() { + Properties coreProps = settings.toConsumerProps("tb_core_updated"); + assertThat(coreProps.getProperty("max.poll.records")).isEqualTo("10"); + assertThat(coreProps.getProperty("enable.auto.commit")).isEqualTo("true"); + assertThat(coreProps.getProperty("bootstrap.servers")).isEqualTo("kafka1:9092,kafka2:9092"); + + Properties edgeProps = settings.toConsumerProps("tb_edge_updated"); + assertThat(edgeProps.getProperty("max.poll.records")).isEqualTo("5"); + assertThat(edgeProps.getProperty("auto.offset.reset")).isEqualTo("latest"); + } + +}