Improvement after review
This commit is contained in:
parent
ccb8ae6378
commit
edd7d6392a
@ -1630,9 +1630,10 @@ queue:
|
|||||||
value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}"
|
value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}"
|
||||||
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
|
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
|
||||||
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
|
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
|
||||||
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
|
# Each entry sets a single property for a specific topic. To define multiple properties for a topic, repeat the topic key.
|
||||||
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
|
# Format: "topic1:key=value;topic1:key=value;topic2:key=value"
|
||||||
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
|
# Example: tb_core_updated:max.poll.records=10;tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;tb_edge_updated:auto.offset.reset=latest
|
||||||
|
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:tb_core_updated:max.poll.records=10;tb_core_updated:enable.auto.commit=true;tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;tb_edge_updated:max.poll.records=5;tb_edge_updated:auto.offset.reset=latest}"
|
||||||
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
|
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
|
||||||
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
|
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
|
||||||
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
|
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
|
||||||
|
|||||||
@ -37,9 +37,8 @@ import org.springframework.stereotype.Component;
|
|||||||
import org.thingsboard.server.common.data.TbProperty;
|
import org.thingsboard.server.common.data.TbProperty;
|
||||||
import org.thingsboard.server.queue.util.PropertyUtils;
|
import org.thingsboard.server.queue.util.PropertyUtils;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
@ -149,7 +148,7 @@ public class TbKafkaSettings {
|
|||||||
private List<TbProperty> other;
|
private List<TbProperty> other;
|
||||||
|
|
||||||
@Setter
|
@Setter
|
||||||
private Map<String, List<TbProperty>> consumerPropertiesPerTopic = Collections.emptyMap();
|
private Map<String, List<TbProperty>> consumerPropertiesPerTopic = new HashMap<>();
|
||||||
|
|
||||||
private volatile AdminClient adminClient;
|
private volatile AdminClient adminClient;
|
||||||
|
|
||||||
@ -260,23 +259,22 @@ public class TbKafkaSettings {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, List<TbProperty>> parseTopicPropertyList(String inlineProperties) {
|
private Map<String, List<TbProperty>> parseTopicPropertyList(String inlineProperties) {
|
||||||
|
Map<String, List<String>> grouped = PropertyUtils.getGroupedProps(inlineProperties);
|
||||||
Map<String, List<TbProperty>> result = new HashMap<>();
|
Map<String, List<TbProperty>> result = new HashMap<>();
|
||||||
Map<String, String> rawTopicToPropertyString = PropertyUtils.getProps(inlineProperties);
|
|
||||||
|
|
||||||
for (Map.Entry<String, String> entry : rawTopicToPropertyString.entrySet()) {
|
grouped.forEach((topic, entries) -> {
|
||||||
String topic = entry.getKey().trim();
|
Map<String, String> merged = new LinkedHashMap<>();
|
||||||
String propertiesStr = entry.getValue();
|
for (String entry : entries) {
|
||||||
|
String[] kv = entry.split("=", 2);
|
||||||
List<TbProperty> tbProperties = Arrays.stream(propertiesStr.split(","))
|
if (kv.length == 2) {
|
||||||
.map(kv -> kv.split("=", 2))
|
merged.put(kv[0].trim(), kv[1].trim());
|
||||||
.filter(kvArr -> kvArr.length == 2)
|
}
|
||||||
.map(kvArr -> new TbProperty(kvArr[0].trim(), kvArr[1].trim()))
|
|
||||||
.toList();
|
|
||||||
|
|
||||||
if (!tbProperties.isEmpty()) {
|
|
||||||
result.put(topic, tbProperties);
|
|
||||||
}
|
}
|
||||||
}
|
List<TbProperty> props = merged.entrySet().stream()
|
||||||
|
.map(e -> new TbProperty(e.getKey(), e.getValue()))
|
||||||
|
.toList();
|
||||||
|
result.put(topic, props);
|
||||||
|
});
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,7 +17,9 @@ package org.thingsboard.server.queue.util;
|
|||||||
|
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
@ -38,6 +40,21 @@ public class PropertyUtils {
|
|||||||
return configs;
|
return configs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<String, List<String>> getGroupedProps(String properties) {
|
||||||
|
Map<String, List<String>> configs = new HashMap<>();
|
||||||
|
if (StringUtils.isNotEmpty(properties)) {
|
||||||
|
for (String property : properties.split(";")) {
|
||||||
|
if (StringUtils.isNotEmpty(property)) {
|
||||||
|
int delimiterPosition = property.indexOf(":");
|
||||||
|
String topic = property.substring(0, delimiterPosition).trim();
|
||||||
|
String value = property.substring(delimiterPosition + 1).trim();
|
||||||
|
configs.computeIfAbsent(topic, k -> new ArrayList<>()).add(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return configs;
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<String, String> getProps(Map<String, String> defaultProperties, String propertiesStr) {
|
public static Map<String, String> getProps(Map<String, String> defaultProperties, String propertiesStr) {
|
||||||
return getProps(defaultProperties, propertiesStr, PropertyUtils::getProps);
|
return getProps(defaultProperties, propertiesStr, PropertyUtils::getProps);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -33,6 +33,12 @@ import static org.mockito.Mockito.spy;
|
|||||||
"queue.type=kafka",
|
"queue.type=kafka",
|
||||||
"queue.kafka.bootstrap.servers=localhost:9092",
|
"queue.kafka.bootstrap.servers=localhost:9092",
|
||||||
"queue.kafka.other-inline=metrics.recording.level:INFO;metrics.sample.window.ms:30000",
|
"queue.kafka.other-inline=metrics.recording.level:INFO;metrics.sample.window.ms:30000",
|
||||||
|
"queue.kafka.consumer-properties-per-topic-inline=" +
|
||||||
|
"tb_core_updated:max.poll.records=10;" +
|
||||||
|
"tb_core_updated:enable.auto.commit=true;" +
|
||||||
|
"tb_core_updated:bootstrap.servers=kafka1:9092,kafka2:9092;" +
|
||||||
|
"tb_edge_updated:max.poll.records=5;" +
|
||||||
|
"tb_edge_updated:auto.offset.reset=latest"
|
||||||
})
|
})
|
||||||
class TbKafkaSettingsTest {
|
class TbKafkaSettingsTest {
|
||||||
|
|
||||||
@ -79,4 +85,16 @@ class TbKafkaSettingsTest {
|
|||||||
Mockito.verify(settings).configureSSL(any());
|
Mockito.verify(settings).configureSSL(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
@Test
|
||||||
|
void givenMultipleTopicsInInlineConfig_whenParsed_thenEachTopicGetsExpectedProperties() {
|
||||||
|
Properties coreProps = settings.toConsumerProps("tb_core_updated");
|
||||||
|
assertThat(coreProps.getProperty("max.poll.records")).isEqualTo("10");
|
||||||
|
assertThat(coreProps.getProperty("enable.auto.commit")).isEqualTo("true");
|
||||||
|
assertThat(coreProps.getProperty("bootstrap.servers")).isEqualTo("kafka1:9092,kafka2:9092");
|
||||||
|
|
||||||
|
Properties edgeProps = settings.toConsumerProps("tb_edge_updated");
|
||||||
|
assertThat(edgeProps.getProperty("max.poll.records")).isEqualTo("5");
|
||||||
|
assertThat(edgeProps.getProperty("auto.offset.reset")).isEqualTo("latest");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user