added global queue prefix support for js-executor

This commit is contained in:
dashevchenko 2023-10-30 16:17:12 +02:00
parent fc63257369
commit c8b8fb2147
7 changed files with 12 additions and 5 deletions

View File

@ -15,6 +15,7 @@
#
queue_type: "TB_QUEUE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
queue_prefix: "TB_QUEUE_PREFIX"
request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
http_port: "HTTP_PORT" # /livenessProbe

View File

@ -16,6 +16,7 @@
queue_type: "kafka"
request_topic: "js_eval.requests"
queue_prefix: ""
http_port: "8888" # /livenessProbe
js:

View File

@ -38,7 +38,8 @@ import uuid from 'uuid-random';
export class AwsSqsTemplate implements IQueue {
private logger = _logger(`awsSqsTemplate`);
private requestTopic: string = config.get('request_topic');
private queuePrefix: string = config.get('queue_prefix');
private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
private accessKeyId: string = config.get('aws_sqs.access_key_id');
private secretAccessKey: string = config.get('aws_sqs.secret_access_key');
private region: string = config.get('aws_sqs.region');

View File

@ -61,7 +61,8 @@ export class KafkaTemplate implements IQueue {
async init(): Promise<void> {
const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers');
const requestTopic: string = config.get('request_topic');
const queuePrefix: string = config.get('queue_prefix');
const requestTopic: string = queuePrefix ? queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
const useConfluent = config.get('kafka.use_confluent_cloud');
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);

View File

@ -26,7 +26,8 @@ export class PubSubTemplate implements IQueue {
private logger = _logger(`pubSubTemplate`);
private projectId: string = config.get('pubsub.project_id');
private credentials = JSON.parse(config.get('pubsub.service_account'));
private requestTopic: string = config.get('request_topic');
private queuePrefix: string = config.get('queue_prefix');
private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
private queueProperties: string = config.get('pubsub.queue_properties');
private pubSubClient: PubSub;

View File

@ -24,7 +24,8 @@ import { Options, Replies } from 'amqplib/properties';
export class RabbitMqTemplate implements IQueue {
private logger = _logger(`rabbitmqTemplate`);
private requestTopic: string = config.get('request_topic');
private queuePrefix: string = config.get('queue_prefix');
private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
private host = config.get('rabbitmq.host');
private port = config.get('rabbitmq.port');
private vhost = config.get('rabbitmq.virtual_host');

View File

@ -31,7 +31,8 @@ import {
export class ServiceBusTemplate implements IQueue {
private logger = _logger(`serviceBusTemplate`);
private requestTopic: string = config.get('request_topic');
private queuePrefix: string = config.get('queue_prefix');
private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
private namespaceName = config.get('service_bus.namespace_name');
private sasKeyName = config.get('service_bus.sas_key_name');
private sasKey = config.get('service_bus.sas_key');