diff --git a/msa/js-executor/api/httpServer.ts b/msa/js-executor/api/httpServer.ts index f3671a1b89..e1c294fdff 100644 --- a/msa/js-executor/api/httpServer.ts +++ b/msa/js-executor/api/httpServer.ts @@ -16,12 +16,15 @@ import express from 'express'; import { _logger} from '../config/logger'; +import http from 'http'; +import { Socket } from 'net'; export class HttpServer { private logger = _logger('httpServer'); private app = express(); - private server; + private server: http.Server | null; + private connections: Socket[] = []; constructor(httpPort: number) { this.app.get('/livenessProbe', async (req, res) => { @@ -32,15 +35,31 @@ export class HttpServer { }) this.server = this.app.listen(httpPort, () => { - this.logger.info('Started http endpoint on port %s. Please, use /livenessProbe !', httpPort); + this.logger.info('Started HTTP endpoint on port %s. Please, use /livenessProbe !', httpPort); }).on('error', (error) => { this.logger.error(error); }); - } - stop() { - this.server.close(() => { - this.logger.info('Http server stop'); + this.server.on('connection', connection => { + this.connections.push(connection); + connection.on('close', () => this.connections = this.connections.filter(curr => curr !== connection)); }); } + + async stop() { + if (this.server) { + this.logger.info('Stopping HTTP Server...'); + const _server = this.server; + this.server = null; + this.connections.forEach(curr => curr.end(() => curr.destroy())); + await new Promise( + (resolve, reject) => { + _server.close((err) => { + this.logger.info('HTTP Server stopped.'); + resolve(); + }); + } + ); + } + } } diff --git a/msa/js-executor/docker/Dockerfile b/msa/js-executor/docker/Dockerfile index 620d712ad1..d17138d923 100644 --- a/msa/js-executor/docker/Dockerfile +++ b/msa/js-executor/docker/Dockerfile @@ -29,6 +29,7 @@ COPY package/linux/conf ./conf COPY package/linux/conf ./config COPY src/api ./api COPY src/queue ./queue +COPY src/config ./config COPY src/server.js ./ RUN chmod a+x /tmp/*.sh \ diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 1d1059aa90..6bc7668e15 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -20,7 +20,7 @@ "config": "^3.3.7", "express": "^4.18.1", "js-yaml": "^4.1.0", - "kafkajs": "^2.0.2", + "kafkajs": "^2.1.0", "long": "^5.2.0", "uuid-parse": "^1.1.0", "uuid-random": "^1.3.2", diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 7bbf2b28fc..36ec9b5270 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -54,82 +54,76 @@ export class AwsSqsTemplate implements IQueue { FifoQueue: 'true' }; + name = 'AWS SQS'; + constructor() { } async init() { - try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + this.sqsClient = new SQSClient({ + apiVersion: '2012-11-05', + credentials: { + accessKeyId: this.accessKeyId, + secretAccessKey: this.secretAccessKey + }, + region: this.region + }); - this.sqsClient = new SQSClient({ - apiVersion: '2012-11-05', - credentials: { - accessKeyId: this.accessKeyId, - secretAccessKey: this.secretAccessKey - }, - region: this.region + const queues = await this.getQueues(); + + if (queues.QueueUrls) { + queues.QueueUrls.forEach(queueUrl => { + const delimiterPosition = queueUrl.lastIndexOf('/'); + const queueName = queueUrl.substring(delimiterPosition + 1); + this.queueUrls.set(queueName, queueUrl); }); + } - const queues = await this.getQueues(); + this.parseQueueProperties(); - if (queues.QueueUrls) { - queues.QueueUrls.forEach(queueUrl => { - const delimiterPosition = queueUrl.lastIndexOf('/'); - const queueName = queueUrl.substring(delimiterPosition + 1); - this.queueUrls.set(queueName, queueUrl); - }); - } + this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || ''; + if (!this.requestQueueURL) { + this.requestQueueURL = await this.createQueue(this.requestTopic); + } - this.parseQueueProperties(); + const messageProcessor = new JsInvokeMessageProcessor(this); - this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || ''; - if (!this.requestQueueURL) { - this.requestQueueURL = await this.createQueue(this.requestTopic); - } + const params: ReceiveMessageRequest = { + MaxNumberOfMessages: 10, + QueueUrl: this.requestQueueURL, + WaitTimeSeconds: this.pollInterval / 1000 + }; + while (!this.stopped) { + let pollStartTs = new Date().getTime(); + const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); + const messages = messagesResponse.Messages; - const messageProcessor = new JsInvokeMessageProcessor(this); + if (messages && messages.length > 0) { + const entries: DeleteMessageBatchRequestEntry[] = []; - const params: ReceiveMessageRequest = { - MaxNumberOfMessages: 10, - QueueUrl: this.requestQueueURL, - WaitTimeSeconds: this.pollInterval / 1000 - }; - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); - const messages = messagesResponse.Messages; - - if (messages && messages.length > 0) { - const entries: DeleteMessageBatchRequestEntry[] = []; - - messages.forEach(message => { - entries.push({ - Id: message.MessageId, - ReceiptHandle: message.ReceiptHandle - }); - messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + messages.forEach(message => { + entries.push({ + Id: message.MessageId, + ReceiptHandle: message.ReceiptHandle }); + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + }); - const deleteBatch: DeleteMessageBatchRequest = { - QueueUrl: this.requestQueueURL, - Entries: entries - }; - try { - await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) - } catch (err: any) { - this.logger.error("Failed to delete messages from queue.", err.message); - } - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } + const deleteBatch: DeleteMessageBatchRequest = { + QueueUrl: this.requestQueueURL, + Entries: entries + }; + try { + await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) + } catch (err: any) { + this.logger.error("Failed to delete messages from queue.", err.message); + } + } else { + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < this.pollInterval) { + await sleep(this.pollInterval - pollDuration); } } - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.exit(-1); } } @@ -187,29 +181,21 @@ export class AwsSqsTemplate implements IQueue { return result.QueueUrl || ''; } - static async build(): Promise { - const queue = new AwsSqsTemplate(); - await queue.init(); - return queue; - } - - async exit(status: number) { + async destroy(): Promise { this.stopped = true; - this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping AWS SQS resources...'); if (this.sqsClient) { - this.logger.info('Stopping Aws Sqs client.') + this.logger.info('Stopping AWS SQS client...'); try { - this.sqsClient.destroy(); + const _sqsClient = this.sqsClient; // @ts-ignore delete this.sqsClient; - this.logger.info('Aws Sqs client stopped.') - process.exit(status); + _sqsClient.destroy(); + this.logger.info('AWS SQS client stopped.'); } catch (e: any) { - this.logger.info('Aws Sqs client stop error.'); - process.exit(status); + this.logger.info('AWS SQS client stop error.'); } - } else { - process.exit(status); } + this.logger.info('AWS SQS resources stopped.') } } diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 8c7e7736c8..51fa6e291b 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -51,111 +51,103 @@ export class KafkaTemplate implements IQueue { private batchMessages: TopicMessages[] = []; private sendLoopInstance: NodeJS.Timeout; + name = 'Kafka'; + constructor() { } async init(): Promise { - try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); + const requestTopic: string = config.get('request_topic'); + const useConfluent = config.get('kafka.use_confluent_cloud'); - const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); - const requestTopic: string = config.get('request_topic'); - const useConfluent = config.get('kafka.use_confluent_cloud'); + this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); + this.logger.info('Kafka Requests Topic: %s', requestTopic); - this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); - this.logger.info('Kafka Requests Topic: %s', requestTopic); + let kafkaConfig: KafkaConfig = { + brokers: kafkaBootstrapServers.split(','), + logLevel: logLevel.INFO, + logCreator: KafkaJsWinstonLogCreator + }; - let kafkaConfig: KafkaConfig = { - brokers: kafkaBootstrapServers.split(','), - logLevel: logLevel.INFO, - logCreator: KafkaJsWinstonLogCreator - }; - - if (this.kafkaClientId) { - kafkaConfig['clientId'] = this.kafkaClientId; - } else { - this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); - } - - kafkaConfig['requestTimeout'] = this.requestTimeout; - - if (useConfluent) { - kafkaConfig['sasl'] = { - mechanism: config.get('kafka.confluent.sasl.mechanism') as any, - username: config.get('kafka.confluent.username'), - password: config.get('kafka.confluent.password') - }; - kafkaConfig['ssl'] = true; - } - - this.parseTopicProperties(); - - this.kafkaClient = new Kafka(kafkaConfig); - this.kafkaAdmin = this.kafkaClient.admin(); - await this.kafkaAdmin.connect(); - - let partitions = 1; - - for (let i = 0; i < this.configEntries.length; i++) { - let param = this.configEntries[i]; - if (param.name === 'partitions') { - partitions = param.value; - this.configEntries.splice(i, 1); - break; - } - } - - let topics = await this.kafkaAdmin.listTopics(); - - if (!topics.includes(requestTopic)) { - let createRequestTopicResult = await this.createTopic(requestTopic, partitions); - if (createRequestTopicResult) { - this.logger.info('Created new topic: %s', requestTopic); - } - } - - this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); - this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); - - const {CRASH} = this.consumer.events; - - this.consumer.on(CRASH, e => { - this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); - if (!e.payload.restart) { - this.logger.error('Going to exit due to not retryable error!'); - this.exit(-1); - } - }); - - const messageProcessor = new JsInvokeMessageProcessor(this); - await this.consumer.connect(); - await this.producer.connect(); - this.sendLoopWithLinger(); - await this.consumer.subscribe({topic: requestTopic}); - - this.logger.info('Started ThingsBoard JavaScript Executor Microservice.'); - await this.consumer.run({ - partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, - eachMessage: async ({topic, partition, message}) => { - let headers = message.headers; - let key = message.key || new Buffer([]); - let msg = { - key: key.toString('utf8'), - data: message.value, - headers: { - data: headers - } - }; - messageProcessor.onJsInvokeMessage(msg); - }, - }); - - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.exit(-1); + if (this.kafkaClientId) { + kafkaConfig['clientId'] = this.kafkaClientId; + } else { + this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); } - } + + kafkaConfig['requestTimeout'] = this.requestTimeout; + + if (useConfluent) { + kafkaConfig['sasl'] = { + mechanism: config.get('kafka.confluent.sasl.mechanism') as any, + username: config.get('kafka.confluent.username'), + password: config.get('kafka.confluent.password') + }; + kafkaConfig['ssl'] = true; + } + + this.parseTopicProperties(); + + this.kafkaClient = new Kafka(kafkaConfig); + this.kafkaAdmin = this.kafkaClient.admin(); + await this.kafkaAdmin.connect(); + + let partitions = 1; + + for (let i = 0; i < this.configEntries.length; i++) { + let param = this.configEntries[i]; + if (param.name === 'partitions') { + partitions = param.value; + this.configEntries.splice(i, 1); + break; + } + } + + let topics = await this.kafkaAdmin.listTopics(); + + if (!topics.includes(requestTopic)) { + let createRequestTopicResult = await this.createTopic(requestTopic, partitions); + if (createRequestTopicResult) { + this.logger.info('Created new topic: %s', requestTopic); + } + } + + this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); + this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); + + const {CRASH} = this.consumer.events; + + this.consumer.on(CRASH, async (e) => { + this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); + if (!e.payload.restart) { + this.logger.error('Going to exit due to not retryable error!'); + await this.destroy(); + } + }); + + const messageProcessor = new JsInvokeMessageProcessor(this); + await this.consumer.connect(); + await this.producer.connect(); + this.sendLoopWithLinger(); + await this.consumer.subscribe({topic: requestTopic}); + + await this.consumer.run({ + partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, + eachMessage: async ({topic, partition, message}) => { + let headers = message.headers; + let key = message.key || new Buffer([]); + let msg = { + key: key.toString('utf8'), + data: message.value, + headers: { + data: headers + } + }; + messageProcessor.onJsInvokeMessage(msg); + }, + }); +} async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); @@ -235,41 +227,33 @@ export class KafkaTemplate implements IQueue { }, this.linger); } - static async build(): Promise { - const queue = new KafkaTemplate(); - await queue.init(); - return queue; - } - - - async exit(status: number): Promise { - this.logger.info('Exiting with status: %d ...', status); + async destroy(): Promise { + this.logger.info('Stopping Kafka resources...'); if (this.kafkaAdmin) { this.logger.info('Stopping Kafka Admin...'); - await this.kafkaAdmin.disconnect(); + const _kafkaAdmin = this.kafkaAdmin; // @ts-ignore delete this.kafkaAdmin; + await _kafkaAdmin.disconnect(); this.logger.info('Kafka Admin stopped.'); } if (this.consumer) { this.logger.info('Stopping Kafka Consumer...'); try { - await this.consumer.disconnect(); + const _consumer = this.consumer; // @ts-ignore delete this.consumer; + await _consumer.disconnect(); this.logger.info('Kafka Consumer stopped.'); await this.disconnectProducer(); - process.exit(status); } catch (e: any) { this.logger.info('Kafka Consumer stop error.'); await this.disconnectProducer(); - process.exit(status); } - } else { - process.exit(status); } + this.logger.info('Kafka resources stopped.'); } private async disconnectProducer(): Promise { @@ -279,13 +263,15 @@ export class KafkaTemplate implements IQueue { this.logger.info('Stopping loop...'); clearTimeout(this.sendLoopInstance); await this.sendMessagesAsBatch(); - await this.producer.disconnect(); + const _producer = this.producer; // @ts-ignore delete this.producer; + await _producer.disconnect(); this.logger.info('Kafka Producer stopped.'); } catch (e) { this.logger.info('Kafka Producer stop error.'); } } } + } diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index f14aaf5771..eff35017ba 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -34,56 +34,50 @@ export class PubSubTemplate implements IQueue { private topics: string[] = []; private subscriptions: string[] = []; + name = 'Pub/Sub'; + constructor() { } async init() { - try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - this.pubSubClient = new PubSub({ - projectId: this.projectId, - credentials: this.credentials + this.pubSubClient = new PubSub({ + projectId: this.projectId, + credentials: this.credentials + }); + + this.parseQueueProperties(); + + const topicList = await this.pubSubClient.getTopics(); + + if (topicList) { + topicList[0].forEach(topic => { + this.topics.push(PubSubTemplate.getName(topic.name)); }); - - this.parseQueueProperties(); - - const topicList = await this.pubSubClient.getTopics(); - - if (topicList) { - topicList[0].forEach(topic => { - this.topics.push(PubSubTemplate.getName(topic.name)); - }); - } - - const subscriptionList = await this.pubSubClient.getSubscriptions(); - - if (subscriptionList) { - topicList[0].forEach(sub => { - this.subscriptions.push(PubSubTemplate.getName(sub.name)); - }); - } - - if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) { - await this.createTopic(this.requestTopic); - await this.createSubscription(this.requestTopic); - } - - const subscription = this.pubSubClient.subscription(this.requestTopic); - - const messageProcessor = new JsInvokeMessageProcessor(this); - - const messageHandler = (message: Message) => { - messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); - message.ack(); - }; - - subscription.on('message', messageHandler); - - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.exit(-1); } + + const subscriptionList = await this.pubSubClient.getSubscriptions(); + + if (subscriptionList) { + topicList[0].forEach(sub => { + this.subscriptions.push(PubSubTemplate.getName(sub.name)); + }); + } + + if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) { + await this.createTopic(this.requestTopic); + await this.createSubscription(this.requestTopic); + } + + const subscription = this.pubSubClient.subscription(this.requestTopic); + + const messageProcessor = new JsInvokeMessageProcessor(this); + + const messageHandler = (message: Message) => { + messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); + message.ack(); + }; + + subscription.on('message', messageHandler); } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { @@ -147,29 +141,21 @@ export class PubSubTemplate implements IQueue { } } - static async build(): Promise { - const queue = new PubSubTemplate(); - await queue.init(); - return queue; - } - - async exit(status: number): Promise { - this.logger.info('Exiting with status: %d ...', status); + async destroy(): Promise { + this.logger.info('Stopping Pub/Sub resources...'); if (this.pubSubClient) { - this.logger.info('Stopping Pub/Sub client.') + this.logger.info('Stopping Pub/Sub client...'); try { - await this.pubSubClient.close(); + const _pubSubClient = this.pubSubClient; // @ts-ignore delete this.pubSubClient; - this.logger.info('Pub/Sub client stopped.') - process.exit(status); + await _pubSubClient.close(); + this.logger.info('Pub/Sub client stopped.'); } catch (e) { this.logger.info('Pub/Sub client stop error.'); - process.exit(status); } - } else { - process.exit(status); } + this.logger.info('Pub/Sub resources stopped.'); } } diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index 18ce1f06b6..a86dc8fd1d 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -15,7 +15,8 @@ /// export interface IQueue { + name: string; init(): Promise; send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; - exit(status: number): Promise; + destroy(): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index d18ba80be7..9369f170f5 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -44,41 +44,35 @@ export class RabbitMqTemplate implements IQueue { private stopped = false; private topics: string[] = []; + name = 'RabbitMQ'; + constructor() { } async init(): Promise { - try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; + this.connection = await amqp.connect(url); + this.channel = await this.connection.createConfirmChannel(); - const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; - this.connection = await amqp.connect(url); - this.channel = await this.connection.createConfirmChannel(); + this.parseQueueProperties(); - this.parseQueueProperties(); + await this.createQueue(this.requestTopic); - await this.createQueue(this.requestTopic); + const messageProcessor = new JsInvokeMessageProcessor(this); - const messageProcessor = new JsInvokeMessageProcessor(this); + while (!this.stopped) { + let pollStartTs = new Date().getTime(); + let message = await this.channel.get(this.requestTopic); - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - let message = await this.channel.get(this.requestTopic); - - if (message) { - messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); - this.channel.ack(message); - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } + if (message) { + messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); + this.channel.ack(message); + } else { + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < this.pollInterval) { + await sleep(this.pollInterval - pollDuration); } } - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.exit(-1); } } @@ -114,38 +108,31 @@ export class RabbitMqTemplate implements IQueue { return this.channel.assertQueue(topic, this.queueOptions); } - static async build(): Promise { - const queue = new RabbitMqTemplate(); - await queue.init(); - return queue; - } - - async exit(status: number) { - this.logger.info('Exiting with status: %d ...', status); + async destroy() { + this.logger.info('Stopping RabbitMQ resources...'); if (this.channel) { - this.logger.info('Stopping RabbitMq chanel.') - await this.channel.close(); + this.logger.info('Stopping RabbitMQ chanel...'); + const _channel = this.channel; // @ts-ignore delete this.channel; - this.logger.info('RabbitMq chanel stopped'); + await _channel.close(); + this.logger.info('RabbitMQ chanel stopped'); } if (this.connection) { - this.logger.info('Stopping RabbitMq connection.') + this.logger.info('Stopping RabbitMQ connection...') try { - await this.connection.close(); + const _connection = this.connection; // @ts-ignore delete this.connection; - this.logger.info('RabbitMq client connection.') - process.exit(status); + await _connection.close(); + this.logger.info('RabbitMQ client connection.'); } catch (e) { - this.logger.info('RabbitMq connection stop error.'); - process.exit(status); + this.logger.info('RabbitMQ connection stop error.'); } - } else { - process.exit(status); } + this.logger.info('RabbitMQ resources stopped.') } } diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index 3cfaf3e10c..76d87e8068 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -44,48 +44,42 @@ export class ServiceBusTemplate implements IQueue { private receiver: ServiceBusReceiver; private senderMap = new Map(); + name = 'Azure Service Bus'; + constructor() { } async init() { - try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; + this.sbClient = new ServiceBusClient(connectionString) + this.serviceBusService = new ServiceBusAdministrationClient(connectionString); - const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; - this.sbClient = new ServiceBusClient(connectionString) - this.serviceBusService = new ServiceBusAdministrationClient(connectionString); + this.parseQueueProperties(); - this.parseQueueProperties(); - - const listQueues = await this.serviceBusService.listQueues(); - for await (const queue of listQueues) { - this.queues.push(queue.name); - } - - if (!this.queues.includes(this.requestTopic)) { - await this.createQueueIfNotExist(this.requestTopic); - this.queues.push(this.requestTopic); - } - - this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'}); - - const messageProcessor = new JsInvokeMessageProcessor(this); - - const messageHandler = async (message: ServiceBusReceivedMessage) => { - if (message) { - messageProcessor.onJsInvokeMessage(message.body); - await this.receiver.completeMessage(message); - } - }; - const errorHandler = async (error: ProcessErrorArgs) => { - this.logger.error('Failed to receive message from queue.', error); - }; - this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.exit(-1); + const listQueues = await this.serviceBusService.listQueues(); + for await (const queue of listQueues) { + this.queues.push(queue.name); } + + if (!this.queues.includes(this.requestTopic)) { + await this.createQueueIfNotExist(this.requestTopic); + this.queues.push(this.requestTopic); + } + + this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'}); + + const messageProcessor = new JsInvokeMessageProcessor(this); + + const messageHandler = async (message: ServiceBusReceivedMessage) => { + if (message) { + messageProcessor.onJsInvokeMessage(message.body); + await this.receiver.completeMessage(message); + } + }; + const errorHandler = async (error: ProcessErrorArgs) => { + this.logger.error('Failed to receive message from queue.', error); + }; + this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { @@ -135,41 +129,46 @@ export class ServiceBusTemplate implements IQueue { } } - static async build(): Promise { - const queue = new ServiceBusTemplate(); - await queue.init(); - return queue; - } - - async exit(status: number) { - this.logger.info('Exiting with status: %d ...', status); + async destroy() { this.logger.info('Stopping Azure Service Bus resources...') if (this.receiver) { + this.logger.info('Stopping Service Bus Receiver...'); try { - await this.receiver.close(); + const _receiver = this.receiver; // @ts-ignore delete this.receiver; + await _receiver.close(); + this.logger.info('Service Bus Receiver stopped.'); } catch (e) { + this.logger.info('Service Bus Receiver stop error.'); } } - this.senderMap.forEach(k => { - try { - k.close(); - } catch (e) { - } + this.logger.info('Stopping Service Bus Senders...'); + const senders: Promise[] = []; + this.senderMap.forEach((sender) => { + senders.push(sender.close()); }); this.senderMap.clear(); + try { + await Promise.all(senders); + this.logger.info('Service Bus Senders stopped.'); + } catch (e) { + this.logger.info('Service Bus Senders stop error.'); + } if (this.sbClient) { + this.logger.info('Stopping Service Bus Client...'); try { - await this.sbClient.close(); + const _sbClient = this.sbClient; // @ts-ignore delete this.sbClient; + await _sbClient.close(); + this.logger.info('Service Bus Client stopped.'); } catch (e) { + this.logger.info('Service Bus Client stop error.'); } } this.logger.info('Azure Service Bus resources stopped.') - process.exit(status); } } diff --git a/msa/js-executor/server.ts b/msa/js-executor/server.ts index 3d59d7fe00..446f9d34aa 100644 --- a/msa/js-executor/server.ts +++ b/msa/js-executor/server.ts @@ -30,57 +30,66 @@ logger.info('===CONFIG BEGIN==='); logger.info(JSON.stringify(config, null, 4)); logger.info('===CONFIG END==='); -const serviceType = config.get('queue_type'); +const serviceType: string = config.get('queue_type'); const httpPort = Number(config.get('http_port')); -let queues: IQueue; -let httpServer: HttpServer; +let queues: IQueue | null; +let httpServer: HttpServer | null; (async () => { - switch (serviceType) { - case 'kafka': - logger.info('Starting kafka template.'); - queues = await KafkaTemplate.build(); - logger.info('kafka template started.'); - break; - case 'pubsub': - logger.info('Starting Pub/Sub template.') - queues = await PubSubTemplate.build(); - logger.info('Pub/Sub template started.') - break; - case 'aws-sqs': - logger.info('Starting Aws Sqs template.') - queues = await AwsSqsTemplate.build(); - logger.info('Aws Sqs template started.') - break; - case 'rabbitmq': - logger.info('Starting RabbitMq template.') - queues = await RabbitMqTemplate.build(); - logger.info('RabbitMq template started.') - break; - case 'service-bus': - logger.info('Starting Azure Service Bus template.') - queues = await ServiceBusTemplate.build(); - logger.info('Azure Service Bus template started.') - break; - default: - logger.error('Unknown service type: ', serviceType); - process.exit(-1); + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + try { + queues = await createQueue(serviceType); + logger.info(`Starting ${queues.name} template...`); + await queues.init(); + logger.info(`${queues.name} template started.`); + httpServer = new HttpServer(httpPort); + } catch (e: any) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + await exit(-1); } - httpServer = new HttpServer(httpPort); })(); -process.on('SIGTERM', () => { - logger.info('SIGTERM signal received'); - process.exit(0); +async function createQueue(serviceType: string): Promise { + switch (serviceType) { + case 'kafka': + return new KafkaTemplate(); + case 'pubsub': + return new PubSubTemplate(); + case 'aws-sqs': + return new AwsSqsTemplate(); + case 'rabbitmq': + return new RabbitMqTemplate(); + case 'service-bus': + return new ServiceBusTemplate(); + default: + throw new Error('Unknown service type: ' + serviceType); + } +} + +[`SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => { + process.on(eventType, async () => { + logger.info(`${eventType} signal received`); + await exit(0); + }) +}) + +process.on('exit', (code: number) => { + logger.info(`ThingsBoard JavaScript Executor Microservice has been stopped. Exit code: ${code}.`); }); -process.on('exit', async () => { +async function exit(status: number) { + logger.info('Exiting with status: %d ...', status); if (httpServer) { - httpServer.stop(); + const _httpServer = httpServer; + httpServer = null; + await _httpServer.stop(); } if (queues) { - queues.exit(0); + const _queues = queues; + queues = null; + await _queues.destroy(); } - logger.info('JavaScript Executor Microservice has been stopped.'); -}); + process.exit(status); +} diff --git a/msa/js-executor/yarn.lock b/msa/js-executor/yarn.lock index ed6e7406f9..eb6443e53d 100644 --- a/msa/js-executor/yarn.lock +++ b/msa/js-executor/yarn.lock @@ -2670,10 +2670,10 @@ jws@^4.0.0: jwa "^2.0.0" safe-buffer "^5.0.1" -kafkajs@^2.0.2: - version "2.0.2" - resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.0.2.tgz#cdfc8f57aa4fd69f6d9ca1cce4ee89bbc2a3a1f9" - integrity sha512-g6CM3fAenofOjR1bfOAqeZUEaSGhNtBscNokybSdW1rmIKYNwBPC9xQzwulFJm36u/xcxXUiCl/L/qfslapihA== +kafkajs@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.1.0.tgz#32ede4e8080cc75586c5e4406eeb582fa73f7b1e" + integrity sha512-6IYiOdGWvFPbSbVB+AV3feT+A7vzw5sXm7Ze4QTfP7FRNdY8pGcpiNPvD2lfgYFD8Dm9KbMgBgTt2mf8KaIkzw== keyv@^3.0.0: version "3.1.0"