From 5c6b4122d355c4bf16bf9dfc8d345231d1c1dcaa Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Thu, 16 May 2024 18:02:23 +0300 Subject: [PATCH] Add SSL support for Kafka queue connection in JS executor --- msa/js-executor/api/utils.ts | 4 +++ .../config/custom-environment-variables.yml | 11 +++++++ msa/js-executor/config/default.yml | 2 ++ msa/js-executor/queue/kafkaTemplate.ts | 32 ++++++++++++++++++- 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/msa/js-executor/api/utils.ts b/msa/js-executor/api/utils.ts index 963ea1ddae..c228d98315 100644 --- a/msa/js-executor/api/utils.ts +++ b/msa/js-executor/api/utils.ts @@ -62,3 +62,7 @@ export function parseJsErrorDetails(err: any): string | undefined { export function isNotUUID(candidate: string) { return candidate.length != 36 || !candidate.includes('-'); } + +export function isNotEmptyStr(value: any): boolean { + return typeof value === 'string' && value.trim().length > 0; +} diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 1f5e5d711a..7c79eda0d1 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -38,6 +38,17 @@ kafka: topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh + ssl: + # Enable or disable SSL for Kafka communication. + enabled: "TB_KAFKA_SSL_ENABLED" + # Path to the server certificate file. This file can hold the server certificate or a certificate chain and may include the server private key. + cert_file: "TB_KAFKA_SSL_PEM_CERTIFICATE" + # Optional: Path to the server certificate private key file. Required if the private key is not included in the server certificate file. + key_file: "TB_KAFKA_SSL_PEM_KEY" + # Optional: Password for the server certificate private key, if applicable. + key_password: "TB_KAFKA_SSL_PEM_KEY_PASSWORD" + # Optional: Path to a custom CA certificate file. Defaults to trusting well-known CAs curated by Mozilla. + ca_file: "TB_KAFKA_SSL_PEM_TRUSTS_CERTIFICATE" confluent: sasl: mechanism: "TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index b340425a31..b2e84d0d46 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -38,6 +38,8 @@ kafka: topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1" use_confluent_cloud: false client_id: "kafkajs" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh + ssl: + enabled: false confluent: sasl: mechanism: "PLAIN" diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 4ba3a1aee1..659d1fb8b2 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -15,6 +15,7 @@ /// import config from 'config'; +import fs from 'node:fs'; import { _logger, KafkaJsWinstonLogCreator } from '../config/logger'; import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' import { IQueue } from './queue.models'; @@ -29,8 +30,10 @@ import { Producer, TopicMessages } from 'kafkajs'; +import { isNotEmptyStr } from '../api/utils'; +import { KeyObject } from 'tls'; -import process, { kill, exit } from 'process'; +import process, { exit, kill } from 'process'; export class KafkaTemplate implements IQueue { @@ -64,6 +67,7 @@ export class KafkaTemplate implements IQueue { const queuePrefix: string = config.get('queue_prefix'); const requestTopic: string = queuePrefix ? queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); const useConfluent = config.get('kafka.use_confluent_cloud'); + const enabledSsl = Boolean(config.get('kafka.ssl.enabled')); const groupId:string = queuePrefix ? queuePrefix + ".js-executor-group" : "js-executor-group"; this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); this.logger.info('Kafka Requests Topic: %s', requestTopic); @@ -93,6 +97,31 @@ export class KafkaTemplate implements IQueue { kafkaConfig['ssl'] = true; } + if (enabledSsl) { + const certFilePath: string = config.has('kafka.ssl.cert_file') ? config.get('kafka.ssl.cert_file') : ''; + const keyFilePath: string = config.has('kafka.ssl.key_file') ? config.get('kafka.ssl.key_file') : ''; + const keyPassword: string = config.has('kafka.ssl.key_password') ? config.get('kafka.ssl.key_password') : ''; + const caFilePath: string = config.has('kafka.ssl.ca_file') ? config.get('kafka.ssl.ca_file') : ''; + + kafkaConfig.ssl = {}; + + if (isNotEmptyStr(certFilePath)) { + kafkaConfig.ssl.cert = fs.readFileSync(certFilePath, 'utf-8'); + } + + if (isNotEmptyStr(keyFilePath)) { + const keyConfig: KeyObject = {pem: fs.readFileSync(keyFilePath, 'utf-8')}; + if (isNotEmptyStr(keyPassword)) { + keyConfig.passphrase = keyPassword; + } + kafkaConfig.ssl.key = [keyConfig]; + } + + if (isNotEmptyStr(caFilePath)) { + kafkaConfig.ssl.ca = fs.readFileSync(caFilePath, 'utf-8'); + } + } + this.parseTopicProperties(); this.kafkaClient = new Kafka(kafkaConfig); @@ -213,6 +242,7 @@ export class KafkaTemplate implements IQueue { private createTopic(topic: string, partitions: number): Promise { return this.kafkaAdmin.createTopics({ + timeout: this.requestTimeout, topics: [{ topic: topic, numPartitions: partitions,