Use consumer-properties-per-topic-inline to configure custom topics
This commit is contained in:
parent
e70bba0dd6
commit
3bd2afad83
@ -1628,6 +1628,11 @@ queue:
|
|||||||
- key: max.poll.records
|
- key: max.poll.records
|
||||||
# Max poll records for edqs.state topic
|
# Max poll records for edqs.state topic
|
||||||
value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}"
|
value: "${TB_QUEUE_KAFKA_EDQS_STATE_MAX_POLL_RECORDS:512}"
|
||||||
|
# If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties
|
||||||
|
# for the new topic in `consumer-properties-per-topic-inline`. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc).
|
||||||
|
# Format: "topic1:key1=value1,key2=value2;topic2:key=value"
|
||||||
|
# Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
|
||||||
|
consumer-properties-per-topic-inline: "${TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE:}"
|
||||||
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
|
other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
|
||||||
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
|
other: # DEPRECATED. In this section, you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
|
||||||
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
|
# - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
|
||||||
|
|||||||
@ -15,14 +15,16 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.data;
|
package org.thingsboard.server.common.data;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
/**
|
|
||||||
* Created by ashvayka on 25.09.18.
|
|
||||||
*/
|
|
||||||
@Data
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
public class TbProperty {
|
public class TbProperty {
|
||||||
|
|
||||||
private String key;
|
private String key;
|
||||||
private String value;
|
private String value;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.queue.kafka;
|
package org.thingsboard.server.queue.kafka;
|
||||||
|
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
@ -36,7 +37,9 @@ import org.springframework.stereotype.Component;
|
|||||||
import org.thingsboard.server.common.data.TbProperty;
|
import org.thingsboard.server.common.data.TbProperty;
|
||||||
import org.thingsboard.server.queue.util.PropertyUtils;
|
import org.thingsboard.server.queue.util.PropertyUtils;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
@ -138,6 +141,9 @@ public class TbKafkaSettings {
|
|||||||
@Value("${queue.kafka.other-inline:}")
|
@Value("${queue.kafka.other-inline:}")
|
||||||
private String otherInline;
|
private String otherInline;
|
||||||
|
|
||||||
|
@Value("${queue.kafka.consumer-properties-per-topic-inline:}")
|
||||||
|
private String consumerPropertiesPerTopicInline;
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@Setter
|
@Setter
|
||||||
private List<TbProperty> other;
|
private List<TbProperty> other;
|
||||||
@ -147,6 +153,14 @@ public class TbKafkaSettings {
|
|||||||
|
|
||||||
private volatile AdminClient adminClient;
|
private volatile AdminClient adminClient;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void initInlineTopicProperties() {
|
||||||
|
Map<String, List<TbProperty>> inlineProps = parseTopicPropertyList(consumerPropertiesPerTopicInline);
|
||||||
|
if (!inlineProps.isEmpty()) {
|
||||||
|
consumerPropertiesPerTopic.putAll(inlineProps);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Properties toConsumerProps(String topic) {
|
public Properties toConsumerProps(String topic) {
|
||||||
Properties props = toProps();
|
Properties props = toProps();
|
||||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
|
||||||
@ -245,6 +259,28 @@ public class TbKafkaSettings {
|
|||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, List<TbProperty>> parseTopicPropertyList(String inlineProperties) {
|
||||||
|
Map<String, List<TbProperty>> result = new HashMap<>();
|
||||||
|
Map<String, String> rawTopicToPropertyString = PropertyUtils.getProps(inlineProperties);
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> entry : rawTopicToPropertyString.entrySet()) {
|
||||||
|
String topic = entry.getKey().trim();
|
||||||
|
String propertiesStr = entry.getValue();
|
||||||
|
|
||||||
|
List<TbProperty> tbProperties = Arrays.stream(propertiesStr.split(","))
|
||||||
|
.map(kv -> kv.split("=", 2))
|
||||||
|
.filter(kvArr -> kvArr.length == 2)
|
||||||
|
.map(kvArr -> new TbProperty(kvArr[0].trim(), kvArr[1].trim()))
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
if (!tbProperties.isEmpty()) {
|
||||||
|
result.put(topic, tbProperties);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
private void destroy() {
|
private void destroy() {
|
||||||
if (adminClient != null) {
|
if (adminClient != null) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user