TbKafkaNode - replace \ and n symbols with new line symbol \n in SSL PEM configuration properties

This commit is contained in:
Volodymyr Babak 2023-12-14 18:50:20 +02:00
parent f4761519b6
commit 4e366f5f66

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
@ -91,7 +92,14 @@ public class TbKafkaNode extends TbAbstractExternalNode {
properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getLinger()); properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getLinger());
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory()); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory());
if (config.getOtherProperties() != null) { if (config.getOtherProperties() != null) {
config.getOtherProperties().forEach(properties::put); config.getOtherProperties().forEach((k, v) -> {
if (SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG.equals(k)
|| SslConfigs.SSL_KEYSTORE_KEY_CONFIG.equals(k)
|| SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG.equals(k)) {
v = v.replace("\\n", "\n");
}
properties.put(k, v);
});
} }
addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false); addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false);
toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8; toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;