implemented consumerPropertiesPerTopic

This commit is contained in:
YevhenBondarenko 2021-04-23 10:56:49 +03:00
parent c9439b3976
commit 2658de715a
4 changed files with 16 additions and 3 deletions

View File

@ -743,6 +743,10 @@ queue:
sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
consumerPropertiesPerTopic:
tb_firmware:
- key: max.poll.records
value: 10
other: other:
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"

View File

@ -69,7 +69,7 @@ public class TbKafkaConsumerStatsService {
this.adminClient = AdminClient.create(kafkaSettings.toAdminProps()); this.adminClient = AdminClient.create(kafkaSettings.toAdminProps());
this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats")); this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats"));
Properties consumerProps = kafkaSettings.toConsumerProps(); Properties consumerProps = kafkaSettings.toConsumerProps(null);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client"); consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-stats-loader-client-group"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-stats-loader-client-group");
this.consumer = new KafkaConsumer<>(consumerProps); this.consumer = new KafkaConsumer<>(consumerProps);

View File

@ -50,7 +50,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
String clientId, String groupId, String topic, String clientId, String groupId, String topic,
TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) { TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) {
super(topic); super(topic);
Properties props = settings.toConsumerProps(); Properties props = settings.toConsumerProps(topic);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
if (groupId != null) { if (groupId != null) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

View File

@ -31,7 +31,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
/** /**
@ -95,6 +97,9 @@ public class TbKafkaSettings {
@Setter @Setter
private List<TbKafkaProperty> other; private List<TbKafkaProperty> other;
@Setter
private Map<String, List<TbKafkaProperty>> consumerPropertiesPerTopic;
public Properties toAdminProps() { public Properties toAdminProps() {
Properties props = toProps(); Properties props = toProps();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
@ -103,7 +108,7 @@ public class TbKafkaSettings {
return props; return props;
} }
public Properties toConsumerProps() { public Properties toConsumerProps(String topic) {
Properties props = toProps(); Properties props = toProps();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
@ -113,6 +118,10 @@ public class TbKafkaSettings {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerPropertiesPerTopic
.getOrDefault(topic, Collections.emptyList())
.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
return props; return props;
} }