From c8b8fb21470c4eda115b17aca77ab45a4777a511 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 30 Oct 2023 16:17:12 +0200 Subject: [PATCH] added global queue prefix support for js-executor --- msa/js-executor/config/custom-environment-variables.yml | 1 + msa/js-executor/config/default.yml | 1 + msa/js-executor/queue/awsSqsTemplate.ts | 3 ++- msa/js-executor/queue/kafkaTemplate.ts | 3 ++- msa/js-executor/queue/pubSubTemplate.ts | 3 ++- msa/js-executor/queue/rabbitmqTemplate.ts | 3 ++- msa/js-executor/queue/serviceBusTemplate.ts | 3 ++- 7 files changed, 12 insertions(+), 5 deletions(-) diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 5ca66d71a1..c92ae78553 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -15,6 +15,7 @@ # queue_type: "TB_QUEUE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) +queue_prefix: "TB_QUEUE_PREFIX" request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" http_port: "HTTP_PORT" # /livenessProbe diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 43712f0f61..e6927c0bff 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -16,6 +16,7 @@ queue_type: "kafka" request_topic: "js_eval.requests" +queue_prefix: "" http_port: "8888" # /livenessProbe js: diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index c9951008c7..1486df802b 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -38,7 +38,8 @@ import uuid from 'uuid-random'; export class AwsSqsTemplate implements IQueue { private logger = _logger(`awsSqsTemplate`); - private requestTopic: string = config.get('request_topic'); + private queuePrefix: string = config.get('queue_prefix'); + private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); private accessKeyId: string = config.get('aws_sqs.access_key_id'); private secretAccessKey: string = config.get('aws_sqs.secret_access_key'); private region: string = config.get('aws_sqs.region'); diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 5d1b6cc6c1..9759c095c9 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -61,7 +61,8 @@ export class KafkaTemplate implements IQueue { async init(): Promise { const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); - const requestTopic: string = config.get('request_topic'); + const queuePrefix: string = config.get('queue_prefix'); + const requestTopic: string = queuePrefix ? queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); const useConfluent = config.get('kafka.use_confluent_cloud'); this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index 71c9b72d5f..a1b05ab7bf 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -26,7 +26,8 @@ export class PubSubTemplate implements IQueue { private logger = _logger(`pubSubTemplate`); private projectId: string = config.get('pubsub.project_id'); private credentials = JSON.parse(config.get('pubsub.service_account')); - private requestTopic: string = config.get('request_topic'); + private queuePrefix: string = config.get('queue_prefix'); + private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); private queueProperties: string = config.get('pubsub.queue_properties'); private pubSubClient: PubSub; diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index fca4b1e5bd..7bb078c047 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -24,7 +24,8 @@ import { Options, Replies } from 'amqplib/properties'; export class RabbitMqTemplate implements IQueue { private logger = _logger(`rabbitmqTemplate`); - private requestTopic: string = config.get('request_topic'); + private queuePrefix: string = config.get('queue_prefix'); + private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); private host = config.get('rabbitmq.host'); private port = config.get('rabbitmq.port'); private vhost = config.get('rabbitmq.virtual_host'); diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index 16950b7bce..70f0d68d3b 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -31,7 +31,8 @@ import { export class ServiceBusTemplate implements IQueue { private logger = _logger(`serviceBusTemplate`); - private requestTopic: string = config.get('request_topic'); + private queuePrefix: string = config.get('queue_prefix'); + private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); private namespaceName = config.get('service_bus.namespace_name'); private sasKeyName = config.get('service_bus.sas_key_name'); private sasKey = config.get('service_bus.sas_key');