Add SSL option for Kafka queue connection
This commit is contained in:
parent
eb9cd79c7e
commit
47542eb8c1
@ -987,6 +987,13 @@ queue:
|
|||||||
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
|
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||||
|
ssl:
|
||||||
|
enabled: "${TB_KAFKA_SSL_ENABLED:false}"
|
||||||
|
truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}"
|
||||||
|
truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}"
|
||||||
|
keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}"
|
||||||
|
keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}"
|
||||||
|
key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}"
|
||||||
acks: "${TB_KAFKA_ACKS:all}"
|
acks: "${TB_KAFKA_ACKS:all}"
|
||||||
retries: "${TB_KAFKA_RETRIES:1}"
|
retries: "${TB_KAFKA_RETRIES:1}"
|
||||||
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import lombok.Getter;
|
|||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.common.config.SslConfigs;
|
||||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
@ -49,6 +50,24 @@ public class TbKafkaSettings {
|
|||||||
@Value("${queue.kafka.bootstrap.servers}")
|
@Value("${queue.kafka.bootstrap.servers}")
|
||||||
private String servers;
|
private String servers;
|
||||||
|
|
||||||
|
@Value("${queue.kafka.ssl.enabled:false}")
|
||||||
|
private boolean sslEnabled;
|
||||||
|
|
||||||
|
@Value("${queue.kafka.ssl.truststore.location}")
|
||||||
|
private String sslTruststoreLocation;
|
||||||
|
|
||||||
|
@Value("${queue.kafka.ssl.truststore.password}")
|
||||||
|
private String sslTruststorePassword;
|
||||||
|
|
||||||
|
@Value("${queue.kafka.ssl.keystore.location}")
|
||||||
|
private String sslKeystoreLocation;
|
||||||
|
|
||||||
|
@Value("${queue.kafka.ssl.keystore.password}")
|
||||||
|
private String sslKeystorePassword;
|
||||||
|
|
||||||
|
@Value("${queue.kafka.ssl.key.password}")
|
||||||
|
private String sslKeyPassword;
|
||||||
|
|
||||||
@Value("${queue.kafka.acks}")
|
@Value("${queue.kafka.acks}")
|
||||||
private String acks;
|
private String acks;
|
||||||
|
|
||||||
@ -115,6 +134,15 @@ public class TbKafkaSettings {
|
|||||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
|
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
|
||||||
props.put(AdminClientConfig.RETRIES_CONFIG, retries);
|
props.put(AdminClientConfig.RETRIES_CONFIG, retries);
|
||||||
|
|
||||||
|
if (sslEnabled) {
|
||||||
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword);
|
||||||
|
}
|
||||||
|
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,6 +154,15 @@ public class TbKafkaSettings {
|
|||||||
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
|
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
|
||||||
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
|
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
|
||||||
|
|
||||||
|
if (sslEnabled) {
|
||||||
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword);
|
||||||
|
}
|
||||||
|
|
||||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||||
|
|
||||||
@ -164,6 +201,16 @@ public class TbKafkaSettings {
|
|||||||
if (other != null) {
|
if (other != null) {
|
||||||
other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
|
other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sslEnabled) {
|
||||||
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
|
||||||
|
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword);
|
||||||
|
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword);
|
||||||
|
}
|
||||||
|
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -50,6 +50,13 @@ queue:
|
|||||||
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
|
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||||
|
ssl:
|
||||||
|
enabled: "${TB_KAFKA_SSL_ENABLED:false}"
|
||||||
|
truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}"
|
||||||
|
truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}"
|
||||||
|
keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}"
|
||||||
|
keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}"
|
||||||
|
key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}"
|
||||||
acks: "${TB_KAFKA_ACKS:all}"
|
acks: "${TB_KAFKA_ACKS:all}"
|
||||||
retries: "${TB_KAFKA_RETRIES:1}"
|
retries: "${TB_KAFKA_RETRIES:1}"
|
||||||
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
||||||
|
|||||||
@ -151,6 +151,13 @@ queue:
|
|||||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||||
|
ssl:
|
||||||
|
enabled: "${TB_KAFKA_SSL_ENABLED:false}"
|
||||||
|
truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}"
|
||||||
|
truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}"
|
||||||
|
keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}"
|
||||||
|
keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}"
|
||||||
|
key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}"
|
||||||
acks: "${TB_KAFKA_ACKS:all}"
|
acks: "${TB_KAFKA_ACKS:all}"
|
||||||
retries: "${TB_KAFKA_RETRIES:1}"
|
retries: "${TB_KAFKA_RETRIES:1}"
|
||||||
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
||||||
|
|||||||
@ -136,6 +136,13 @@ queue:
|
|||||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||||
|
ssl:
|
||||||
|
enabled: "${TB_KAFKA_SSL_ENABLED:false}"
|
||||||
|
truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}"
|
||||||
|
truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}"
|
||||||
|
keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}"
|
||||||
|
keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}"
|
||||||
|
key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}"
|
||||||
acks: "${TB_KAFKA_ACKS:all}"
|
acks: "${TB_KAFKA_ACKS:all}"
|
||||||
retries: "${TB_KAFKA_RETRIES:1}"
|
retries: "${TB_KAFKA_RETRIES:1}"
|
||||||
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
||||||
|
|||||||
@ -217,6 +217,13 @@ queue:
|
|||||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||||
|
ssl:
|
||||||
|
enabled: "${TB_KAFKA_SSL_ENABLED:false}"
|
||||||
|
truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}"
|
||||||
|
truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}"
|
||||||
|
keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}"
|
||||||
|
keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}"
|
||||||
|
key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}"
|
||||||
acks: "${TB_KAFKA_ACKS:all}"
|
acks: "${TB_KAFKA_ACKS:all}"
|
||||||
retries: "${TB_KAFKA_RETRIES:1}"
|
retries: "${TB_KAFKA_RETRIES:1}"
|
||||||
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
||||||
|
|||||||
@ -166,6 +166,13 @@ queue:
|
|||||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||||
|
ssl:
|
||||||
|
enabled: "${TB_KAFKA_SSL_ENABLED:false}"
|
||||||
|
truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}"
|
||||||
|
truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}"
|
||||||
|
keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}"
|
||||||
|
keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}"
|
||||||
|
key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}"
|
||||||
acks: "${TB_KAFKA_ACKS:all}"
|
acks: "${TB_KAFKA_ACKS:all}"
|
||||||
retries: "${TB_KAFKA_RETRIES:1}"
|
retries: "${TB_KAFKA_RETRIES:1}"
|
||||||
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
||||||
|
|||||||
@ -112,6 +112,13 @@ queue:
|
|||||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
|
||||||
|
ssl:
|
||||||
|
enabled: "${TB_KAFKA_SSL_ENABLED:false}"
|
||||||
|
truststore.location: "${TB_KAFKA_SSL_TRUSTSTORE_LOCATION:}"
|
||||||
|
truststore.password: "${TB_KAFKA_SSL_TRUSTSTORE_PASSWORD:}"
|
||||||
|
keystore.location: "${TB_KAFKA_SSL_KEYSTORE_LOCATION:}"
|
||||||
|
keystore.password: "${TB_KAFKA_SSL_KEYSTORE_PASSWORD:}"
|
||||||
|
key.password: "${TB_KAFKA_SSL_KEY_PASSWORD:}"
|
||||||
acks: "${TB_KAFKA_ACKS:all}"
|
acks: "${TB_KAFKA_ACKS:all}"
|
||||||
retries: "${TB_KAFKA_RETRIES:1}"
|
retries: "${TB_KAFKA_RETRIES:1}"
|
||||||
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user