diff --git a/msa/js-executor/api/httpServer.ts b/msa/js-executor/api/httpServer.ts index f3671a1b89..e1c294fdff 100644 --- a/msa/js-executor/api/httpServer.ts +++ b/msa/js-executor/api/httpServer.ts @@ -16,12 +16,15 @@ import express from 'express'; import { _logger} from '../config/logger'; +import http from 'http'; +import { Socket } from 'net'; export class HttpServer { private logger = _logger('httpServer'); private app = express(); - private server; + private server: http.Server | null; + private connections: Socket[] = []; constructor(httpPort: number) { this.app.get('/livenessProbe', async (req, res) => { @@ -32,15 +35,31 @@ export class HttpServer { }) this.server = this.app.listen(httpPort, () => { - this.logger.info('Started http endpoint on port %s. Please, use /livenessProbe !', httpPort); + this.logger.info('Started HTTP endpoint on port %s. Please, use /livenessProbe !', httpPort); }).on('error', (error) => { this.logger.error(error); }); - } - stop() { - this.server.close(() => { - this.logger.info('Http server stop'); + this.server.on('connection', connection => { + this.connections.push(connection); + connection.on('close', () => this.connections = this.connections.filter(curr => curr !== connection)); }); } + + async stop() { + if (this.server) { + this.logger.info('Stopping HTTP Server...'); + const _server = this.server; + this.server = null; + this.connections.forEach(curr => curr.end(() => curr.destroy())); + await new Promise( + (resolve, reject) => { + _server.close((err) => { + this.logger.info('HTTP Server stopped.'); + resolve(); + }); + } + ); + } + } } diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 7bbf2b28fc..28d421269b 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -59,8 +59,6 @@ export class AwsSqsTemplate implements IQueue { async init() { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - this.sqsClient = new SQSClient({ apiVersion: '2012-11-05', credentials: { @@ -129,7 +127,7 @@ export class AwsSqsTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -193,23 +191,23 @@ export class AwsSqsTemplate implements IQueue { return queue; } - async exit(status: number) { + async destroy(status: number): Promise { this.stopped = true; this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping AWS SQS resources...'); if (this.sqsClient) { - this.logger.info('Stopping Aws Sqs client.') + this.logger.info('Stopping AWS SQS client...'); try { - this.sqsClient.destroy(); + const _sqsClient = this.sqsClient; // @ts-ignore delete this.sqsClient; - this.logger.info('Aws Sqs client stopped.') - process.exit(status); + _sqsClient.destroy(); + this.logger.info('AWS SQS client stopped.'); } catch (e: any) { - this.logger.info('Aws Sqs client stop error.'); - process.exit(status); + this.logger.info('AWS SQS client stop error.'); } - } else { - process.exit(status); } + this.logger.info('AWS SQS resources stopped.') + process.exit(status); } } diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 8c7e7736c8..2b3e947b5e 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -56,8 +56,6 @@ export class KafkaTemplate implements IQueue { async init(): Promise { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); const requestTopic: string = config.get('request_topic'); const useConfluent = config.get('kafka.use_confluent_cloud'); @@ -119,11 +117,11 @@ export class KafkaTemplate implements IQueue { const {CRASH} = this.consumer.events; - this.consumer.on(CRASH, e => { + this.consumer.on(CRASH, async (e) => { this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); if (!e.payload.restart) { this.logger.error('Going to exit due to not retryable error!'); - this.exit(-1); + await this.destroy(-1); } }); @@ -133,7 +131,6 @@ export class KafkaTemplate implements IQueue { this.sendLoopWithLinger(); await this.consumer.subscribe({topic: requestTopic}); - this.logger.info('Started ThingsBoard JavaScript Executor Microservice.'); await this.consumer.run({ partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, eachMessage: async ({topic, partition, message}) => { @@ -153,7 +150,7 @@ export class KafkaTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -242,34 +239,35 @@ export class KafkaTemplate implements IQueue { } - async exit(status: number): Promise { + async destroy(status: number): Promise { this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping Kafka resources...'); if (this.kafkaAdmin) { this.logger.info('Stopping Kafka Admin...'); - await this.kafkaAdmin.disconnect(); + const _kafkaAdmin = this.kafkaAdmin; // @ts-ignore delete this.kafkaAdmin; + await _kafkaAdmin.disconnect(); this.logger.info('Kafka Admin stopped.'); } if (this.consumer) { this.logger.info('Stopping Kafka Consumer...'); try { - await this.consumer.disconnect(); + const _consumer = this.consumer; // @ts-ignore delete this.consumer; + await _consumer.disconnect(); this.logger.info('Kafka Consumer stopped.'); await this.disconnectProducer(); - process.exit(status); } catch (e: any) { this.logger.info('Kafka Consumer stop error.'); await this.disconnectProducer(); - process.exit(status); } - } else { - process.exit(status); } + this.logger.info('Kafka resources stopped.'); + process.exit(status); } private async disconnectProducer(): Promise { @@ -279,9 +277,10 @@ export class KafkaTemplate implements IQueue { this.logger.info('Stopping loop...'); clearTimeout(this.sendLoopInstance); await this.sendMessagesAsBatch(); - await this.producer.disconnect(); + const _producer = this.producer; // @ts-ignore delete this.producer; + await _producer.disconnect(); this.logger.info('Kafka Producer stopped.'); } catch (e) { this.logger.info('Kafka Producer stop error.'); diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index f14aaf5771..4e8990a105 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -39,7 +39,6 @@ export class PubSubTemplate implements IQueue { async init() { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); this.pubSubClient = new PubSub({ projectId: this.projectId, credentials: this.credentials @@ -82,7 +81,7 @@ export class PubSubTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -153,23 +152,23 @@ export class PubSubTemplate implements IQueue { return queue; } - async exit(status: number): Promise { + async destroy(status: number): Promise { this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping Pub/Sub resources...'); if (this.pubSubClient) { - this.logger.info('Stopping Pub/Sub client.') + this.logger.info('Stopping Pub/Sub client...'); try { - await this.pubSubClient.close(); + const _pubSubClient = this.pubSubClient; // @ts-ignore delete this.pubSubClient; - this.logger.info('Pub/Sub client stopped.') - process.exit(status); + await _pubSubClient.close(); + this.logger.info('Pub/Sub client stopped.'); } catch (e) { this.logger.info('Pub/Sub client stop error.'); - process.exit(status); } - } else { - process.exit(status); } + this.logger.info('Pub/Sub resources stopped.'); + process.exit(status); } } diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index 18ce1f06b6..59ec68896d 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -17,5 +17,5 @@ export interface IQueue { init(): Promise; send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; - exit(status: number): Promise; + destroy(status: number): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index d18ba80be7..372024a4f3 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -49,8 +49,6 @@ export class RabbitMqTemplate implements IQueue { async init(): Promise { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; this.connection = await amqp.connect(url); this.channel = await this.connection.createConfirmChannel(); @@ -78,7 +76,7 @@ export class RabbitMqTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -120,32 +118,33 @@ export class RabbitMqTemplate implements IQueue { return queue; } - async exit(status: number) { + async destroy(status: number) { this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping RabbitMQ resources...'); if (this.channel) { - this.logger.info('Stopping RabbitMq chanel.') - await this.channel.close(); + this.logger.info('Stopping RabbitMQ chanel...'); + const _channel = this.channel; // @ts-ignore delete this.channel; - this.logger.info('RabbitMq chanel stopped'); + await _channel.close(); + this.logger.info('RabbitMQ chanel stopped'); } if (this.connection) { - this.logger.info('Stopping RabbitMq connection.') + this.logger.info('Stopping RabbitMQ connection...') try { - await this.connection.close(); + const _connection = this.connection; // @ts-ignore delete this.connection; - this.logger.info('RabbitMq client connection.') - process.exit(status); + await _connection.close(); + this.logger.info('RabbitMQ client connection.'); } catch (e) { - this.logger.info('RabbitMq connection stop error.'); - process.exit(status); + this.logger.info('RabbitMQ connection stop error.'); } - } else { - process.exit(status); } + this.logger.info('RabbitMQ resources stopped.') + process.exit(status); } } diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index 3cfaf3e10c..b2750672e5 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -49,8 +49,6 @@ export class ServiceBusTemplate implements IQueue { async init() { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; this.sbClient = new ServiceBusClient(connectionString) this.serviceBusService = new ServiceBusAdministrationClient(connectionString); @@ -84,7 +82,7 @@ export class ServiceBusTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -141,32 +139,45 @@ export class ServiceBusTemplate implements IQueue { return queue; } - async exit(status: number) { + async destroy(status: number) { this.logger.info('Exiting with status: %d ...', status); this.logger.info('Stopping Azure Service Bus resources...') if (this.receiver) { + this.logger.info('Stopping Service Bus Receiver...'); try { - await this.receiver.close(); + const _receiver = this.receiver; // @ts-ignore delete this.receiver; + await _receiver.close(); + this.logger.info('Service Bus Receiver stopped.'); } catch (e) { + this.logger.info('Service Bus Receiver stop error.'); } } - this.senderMap.forEach(k => { - try { - k.close(); - } catch (e) { - } + this.logger.info('Stopping Service Bus Senders...'); + const senders: Promise[] = []; + this.senderMap.forEach((sender) => { + senders.push(sender.close()); }); this.senderMap.clear(); + try { + await Promise.all(senders); + this.logger.info('Service Bus Senders stopped.'); + } catch (e) { + this.logger.info('Service Bus Senders stop error.'); + } if (this.sbClient) { + this.logger.info('Stopping Service Bus Client...'); try { - await this.sbClient.close(); + const _sbClient = this.sbClient; // @ts-ignore delete this.sbClient; + await _sbClient.close(); + this.logger.info('Service Bus Client stopped.'); } catch (e) { + this.logger.info('Service Bus Client stop error.'); } } this.logger.info('Azure Service Bus resources stopped.') diff --git a/msa/js-executor/server.ts b/msa/js-executor/server.ts index 3d59d7fe00..708a87fec9 100644 --- a/msa/js-executor/server.ts +++ b/msa/js-executor/server.ts @@ -32,33 +32,34 @@ logger.info('===CONFIG END==='); const serviceType = config.get('queue_type'); const httpPort = Number(config.get('http_port')); -let queues: IQueue; -let httpServer: HttpServer; +let queues: IQueue | null; +let httpServer: HttpServer | null; (async () => { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); switch (serviceType) { case 'kafka': - logger.info('Starting kafka template.'); + logger.info('Starting Kafka template...'); queues = await KafkaTemplate.build(); - logger.info('kafka template started.'); + logger.info('Kafka template started.'); break; case 'pubsub': - logger.info('Starting Pub/Sub template.') + logger.info('Starting Pub/Sub template...') queues = await PubSubTemplate.build(); logger.info('Pub/Sub template started.') break; case 'aws-sqs': - logger.info('Starting Aws Sqs template.') + logger.info('Starting AWS SQS template...') queues = await AwsSqsTemplate.build(); - logger.info('Aws Sqs template started.') + logger.info('AWS SQS template started.') break; case 'rabbitmq': - logger.info('Starting RabbitMq template.') + logger.info('Starting RabbitMQ template...') queues = await RabbitMqTemplate.build(); - logger.info('RabbitMq template started.') + logger.info('RabbitMQ template started.') break; case 'service-bus': - logger.info('Starting Azure Service Bus template.') + logger.info('Starting Azure Service Bus template...') queues = await ServiceBusTemplate.build(); logger.info('Azure Service Bus template started.') break; @@ -70,17 +71,22 @@ let httpServer: HttpServer; httpServer = new HttpServer(httpPort); })(); -process.on('SIGTERM', () => { - logger.info('SIGTERM signal received'); - process.exit(0); -}); +[`SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => { + process.on(eventType, async () => { + logger.info(`${eventType} signal received`); + if (httpServer) { + const _httpServer = httpServer; + httpServer = null; + await _httpServer.stop(); + } + if (queues) { + const _queues = queues; + queues = null; + await _queues.destroy(0); + } + }) +}) -process.on('exit', async () => { - if (httpServer) { - httpServer.stop(); - } - if (queues) { - queues.exit(0); - } - logger.info('JavaScript Executor Microservice has been stopped.'); +process.on('exit', (code: number) => { + logger.info(`JavaScript Executor Microservice has been stopped. Exit code: ${code}.`); });