diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 7826c19415..5c63fe6e7d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -134,14 +134,7 @@ public class TbKafkaSettings { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(AdminClientConfig.RETRIES_CONFIG, retries); - if (sslEnabled) { - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); - props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); - props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); - props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation); - props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); - props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword); - } + configureSSL(props); return props; } @@ -154,14 +147,7 @@ public class TbKafkaSettings { props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); - if (sslEnabled) { - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); - props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); - props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); - props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation); - props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); - props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword); - } + configureSSL(props); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); @@ -202,6 +188,12 @@ public class TbKafkaSettings { other.forEach(kv -> props.put(kv.getKey(), kv.getValue())); } + configureSSL(props); + + return props; + } + + private void configureSSL(Properties props) { if (sslEnabled) { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); @@ -210,8 +202,6 @@ public class TbKafkaSettings { props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword); } - - return props; } }