diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f745e03ffc..75f18685b6 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -987,6 +987,13 @@ queue: print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + ssl: + enabled: "${TB_KAFKA_SSL_ENABLED:false}" + truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}" + truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}" + keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}" + keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}" + key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 92c4202a79..7826c19415 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -19,6 +19,7 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -49,6 +50,24 @@ public class TbKafkaSettings { @Value("${queue.kafka.bootstrap.servers}") private String servers; + @Value("${queue.kafka.ssl.enabled:false}") + private boolean sslEnabled; + + @Value("${queue.kafka.ssl.truststore.location}") + private String sslTruststoreLocation; + + @Value("${queue.kafka.ssl.truststore.password}") + private String sslTruststorePassword; + + @Value("${queue.kafka.ssl.keystore.location}") + private String sslKeystoreLocation; + + @Value("${queue.kafka.ssl.keystore.password}") + private String sslKeystorePassword; + + @Value("${queue.kafka.ssl.key.password}") + private String sslKeyPassword; + @Value("${queue.kafka.acks}") private String acks; @@ -115,6 +134,15 @@ public class TbKafkaSettings { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(AdminClientConfig.RETRIES_CONFIG, retries); + if (sslEnabled) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword); + } + return props; } @@ -126,6 +154,15 @@ public class TbKafkaSettings { props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); + if (sslEnabled) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword); + } + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); @@ -164,6 +201,16 @@ public class TbKafkaSettings { if (other != null) { other.forEach(kv -> props.put(kv.getKey(), kv.getValue())); } + + if (sslEnabled) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword); + } + return props; } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index ed1377e1b9..d7e3dd3357 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -50,6 +50,13 @@ queue: print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + ssl: + enabled: "${TB_KAFKA_SSL_ENABLED:false}" + truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}" + truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}" + keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}" + keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}" + key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index bcf1e8ddd7..9b0957ac2f 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -151,6 +151,13 @@ queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + ssl: + enabled: "${TB_KAFKA_SSL_ENABLED:false}" + truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}" + truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}" + keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}" + keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}" + key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 56a8419e76..cd154f2fba 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -136,6 +136,13 @@ queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + ssl: + enabled: "${TB_KAFKA_SSL_ENABLED:false}" + truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}" + truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}" + keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}" + keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}" + key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 70d7a895fd..8f2c0ca859 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -217,6 +217,13 @@ queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + ssl: + enabled: "${TB_KAFKA_SSL_ENABLED:false}" + truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}" + truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}" + keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}" + keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}" + key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 99907e797e..a876123175 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -166,6 +166,13 @@ queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + ssl: + enabled: "${TB_KAFKA_SSL_ENABLED:false}" + truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}" + truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}" + keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}" + keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}" + key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 16bfb6397a..0672309cfe 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -112,6 +112,13 @@ queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + ssl: + enabled: "${TB_KAFKA_SSL_ENABLED:false}" + truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}" + truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}" + keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}" + keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}" + key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip