diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 1d1059aa90..6bc7668e15 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -20,7 +20,7 @@ "config": "^3.3.7", "express": "^4.18.1", "js-yaml": "^4.1.0", - "kafkajs": "^2.0.2", + "kafkajs": "^2.1.0", "long": "^5.2.0", "uuid-parse": "^1.1.0", "uuid-random": "^1.3.2", diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 28d421269b..36ec9b5270 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -54,80 +54,76 @@ export class AwsSqsTemplate implements IQueue { FifoQueue: 'true' }; + name = 'AWS SQS'; + constructor() { } async init() { - try { - this.sqsClient = new SQSClient({ - apiVersion: '2012-11-05', - credentials: { - accessKeyId: this.accessKeyId, - secretAccessKey: this.secretAccessKey - }, - region: this.region + this.sqsClient = new SQSClient({ + apiVersion: '2012-11-05', + credentials: { + accessKeyId: this.accessKeyId, + secretAccessKey: this.secretAccessKey + }, + region: this.region + }); + + const queues = await this.getQueues(); + + if (queues.QueueUrls) { + queues.QueueUrls.forEach(queueUrl => { + const delimiterPosition = queueUrl.lastIndexOf('/'); + const queueName = queueUrl.substring(delimiterPosition + 1); + this.queueUrls.set(queueName, queueUrl); }); + } - const queues = await this.getQueues(); + this.parseQueueProperties(); - if (queues.QueueUrls) { - queues.QueueUrls.forEach(queueUrl => { - const delimiterPosition = queueUrl.lastIndexOf('/'); - const queueName = queueUrl.substring(delimiterPosition + 1); - this.queueUrls.set(queueName, queueUrl); - }); - } + this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || ''; + if (!this.requestQueueURL) { + this.requestQueueURL = await this.createQueue(this.requestTopic); + } - this.parseQueueProperties(); + const messageProcessor = new JsInvokeMessageProcessor(this); - this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || ''; - if (!this.requestQueueURL) { - this.requestQueueURL = await this.createQueue(this.requestTopic); - } + const params: ReceiveMessageRequest = { + MaxNumberOfMessages: 10, + QueueUrl: this.requestQueueURL, + WaitTimeSeconds: this.pollInterval / 1000 + }; + while (!this.stopped) { + let pollStartTs = new Date().getTime(); + const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); + const messages = messagesResponse.Messages; - const messageProcessor = new JsInvokeMessageProcessor(this); + if (messages && messages.length > 0) { + const entries: DeleteMessageBatchRequestEntry[] = []; - const params: ReceiveMessageRequest = { - MaxNumberOfMessages: 10, - QueueUrl: this.requestQueueURL, - WaitTimeSeconds: this.pollInterval / 1000 - }; - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); - const messages = messagesResponse.Messages; - - if (messages && messages.length > 0) { - const entries: DeleteMessageBatchRequestEntry[] = []; - - messages.forEach(message => { - entries.push({ - Id: message.MessageId, - ReceiptHandle: message.ReceiptHandle - }); - messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + messages.forEach(message => { + entries.push({ + Id: message.MessageId, + ReceiptHandle: message.ReceiptHandle }); + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + }); - const deleteBatch: DeleteMessageBatchRequest = { - QueueUrl: this.requestQueueURL, - Entries: entries - }; - try { - await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) - } catch (err: any) { - this.logger.error("Failed to delete messages from queue.", err.message); - } - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } + const deleteBatch: DeleteMessageBatchRequest = { + QueueUrl: this.requestQueueURL, + Entries: entries + }; + try { + await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) + } catch (err: any) { + this.logger.error("Failed to delete messages from queue.", err.message); + } + } else { + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < this.pollInterval) { + await sleep(this.pollInterval - pollDuration); } } - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); } } @@ -185,15 +181,8 @@ export class AwsSqsTemplate implements IQueue { return result.QueueUrl || ''; } - static async build(): Promise { - const queue = new AwsSqsTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number): Promise { + async destroy(): Promise { this.stopped = true; - this.logger.info('Exiting with status: %d ...', status); this.logger.info('Stopping AWS SQS resources...'); if (this.sqsClient) { this.logger.info('Stopping AWS SQS client...'); @@ -208,6 +197,5 @@ export class AwsSqsTemplate implements IQueue { } } this.logger.info('AWS SQS resources stopped.') - process.exit(status); } } diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 2b3e947b5e..51fa6e291b 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -51,108 +51,103 @@ export class KafkaTemplate implements IQueue { private batchMessages: TopicMessages[] = []; private sendLoopInstance: NodeJS.Timeout; + name = 'Kafka'; + constructor() { } async init(): Promise { - try { - const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); - const requestTopic: string = config.get('request_topic'); - const useConfluent = config.get('kafka.use_confluent_cloud'); + const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); + const requestTopic: string = config.get('request_topic'); + const useConfluent = config.get('kafka.use_confluent_cloud'); - this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); - this.logger.info('Kafka Requests Topic: %s', requestTopic); + this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); + this.logger.info('Kafka Requests Topic: %s', requestTopic); - let kafkaConfig: KafkaConfig = { - brokers: kafkaBootstrapServers.split(','), - logLevel: logLevel.INFO, - logCreator: KafkaJsWinstonLogCreator - }; + let kafkaConfig: KafkaConfig = { + brokers: kafkaBootstrapServers.split(','), + logLevel: logLevel.INFO, + logCreator: KafkaJsWinstonLogCreator + }; - if (this.kafkaClientId) { - kafkaConfig['clientId'] = this.kafkaClientId; - } else { - this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); - } - - kafkaConfig['requestTimeout'] = this.requestTimeout; - - if (useConfluent) { - kafkaConfig['sasl'] = { - mechanism: config.get('kafka.confluent.sasl.mechanism') as any, - username: config.get('kafka.confluent.username'), - password: config.get('kafka.confluent.password') - }; - kafkaConfig['ssl'] = true; - } - - this.parseTopicProperties(); - - this.kafkaClient = new Kafka(kafkaConfig); - this.kafkaAdmin = this.kafkaClient.admin(); - await this.kafkaAdmin.connect(); - - let partitions = 1; - - for (let i = 0; i < this.configEntries.length; i++) { - let param = this.configEntries[i]; - if (param.name === 'partitions') { - partitions = param.value; - this.configEntries.splice(i, 1); - break; - } - } - - let topics = await this.kafkaAdmin.listTopics(); - - if (!topics.includes(requestTopic)) { - let createRequestTopicResult = await this.createTopic(requestTopic, partitions); - if (createRequestTopicResult) { - this.logger.info('Created new topic: %s', requestTopic); - } - } - - this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); - this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); - - const {CRASH} = this.consumer.events; - - this.consumer.on(CRASH, async (e) => { - this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); - if (!e.payload.restart) { - this.logger.error('Going to exit due to not retryable error!'); - await this.destroy(-1); - } - }); - - const messageProcessor = new JsInvokeMessageProcessor(this); - await this.consumer.connect(); - await this.producer.connect(); - this.sendLoopWithLinger(); - await this.consumer.subscribe({topic: requestTopic}); - - await this.consumer.run({ - partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, - eachMessage: async ({topic, partition, message}) => { - let headers = message.headers; - let key = message.key || new Buffer([]); - let msg = { - key: key.toString('utf8'), - data: message.value, - headers: { - data: headers - } - }; - messageProcessor.onJsInvokeMessage(msg); - }, - }); - - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); + if (this.kafkaClientId) { + kafkaConfig['clientId'] = this.kafkaClientId; + } else { + this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); } - } + + kafkaConfig['requestTimeout'] = this.requestTimeout; + + if (useConfluent) { + kafkaConfig['sasl'] = { + mechanism: config.get('kafka.confluent.sasl.mechanism') as any, + username: config.get('kafka.confluent.username'), + password: config.get('kafka.confluent.password') + }; + kafkaConfig['ssl'] = true; + } + + this.parseTopicProperties(); + + this.kafkaClient = new Kafka(kafkaConfig); + this.kafkaAdmin = this.kafkaClient.admin(); + await this.kafkaAdmin.connect(); + + let partitions = 1; + + for (let i = 0; i < this.configEntries.length; i++) { + let param = this.configEntries[i]; + if (param.name === 'partitions') { + partitions = param.value; + this.configEntries.splice(i, 1); + break; + } + } + + let topics = await this.kafkaAdmin.listTopics(); + + if (!topics.includes(requestTopic)) { + let createRequestTopicResult = await this.createTopic(requestTopic, partitions); + if (createRequestTopicResult) { + this.logger.info('Created new topic: %s', requestTopic); + } + } + + this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); + this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); + + const {CRASH} = this.consumer.events; + + this.consumer.on(CRASH, async (e) => { + this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); + if (!e.payload.restart) { + this.logger.error('Going to exit due to not retryable error!'); + await this.destroy(); + } + }); + + const messageProcessor = new JsInvokeMessageProcessor(this); + await this.consumer.connect(); + await this.producer.connect(); + this.sendLoopWithLinger(); + await this.consumer.subscribe({topic: requestTopic}); + + await this.consumer.run({ + partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, + eachMessage: async ({topic, partition, message}) => { + let headers = message.headers; + let key = message.key || new Buffer([]); + let msg = { + key: key.toString('utf8'), + data: message.value, + headers: { + data: headers + } + }; + messageProcessor.onJsInvokeMessage(msg); + }, + }); +} async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); @@ -232,15 +227,7 @@ export class KafkaTemplate implements IQueue { }, this.linger); } - static async build(): Promise { - const queue = new KafkaTemplate(); - await queue.init(); - return queue; - } - - - async destroy(status: number): Promise { - this.logger.info('Exiting with status: %d ...', status); + async destroy(): Promise { this.logger.info('Stopping Kafka resources...'); if (this.kafkaAdmin) { @@ -267,7 +254,6 @@ export class KafkaTemplate implements IQueue { } } this.logger.info('Kafka resources stopped.'); - process.exit(status); } private async disconnectProducer(): Promise { @@ -287,4 +273,5 @@ export class KafkaTemplate implements IQueue { } } } + } diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index 4e8990a105..eff35017ba 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -34,55 +34,50 @@ export class PubSubTemplate implements IQueue { private topics: string[] = []; private subscriptions: string[] = []; + name = 'Pub/Sub'; + constructor() { } async init() { - try { - this.pubSubClient = new PubSub({ - projectId: this.projectId, - credentials: this.credentials + this.pubSubClient = new PubSub({ + projectId: this.projectId, + credentials: this.credentials + }); + + this.parseQueueProperties(); + + const topicList = await this.pubSubClient.getTopics(); + + if (topicList) { + topicList[0].forEach(topic => { + this.topics.push(PubSubTemplate.getName(topic.name)); }); - - this.parseQueueProperties(); - - const topicList = await this.pubSubClient.getTopics(); - - if (topicList) { - topicList[0].forEach(topic => { - this.topics.push(PubSubTemplate.getName(topic.name)); - }); - } - - const subscriptionList = await this.pubSubClient.getSubscriptions(); - - if (subscriptionList) { - topicList[0].forEach(sub => { - this.subscriptions.push(PubSubTemplate.getName(sub.name)); - }); - } - - if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) { - await this.createTopic(this.requestTopic); - await this.createSubscription(this.requestTopic); - } - - const subscription = this.pubSubClient.subscription(this.requestTopic); - - const messageProcessor = new JsInvokeMessageProcessor(this); - - const messageHandler = (message: Message) => { - messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); - message.ack(); - }; - - subscription.on('message', messageHandler); - - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); } + + const subscriptionList = await this.pubSubClient.getSubscriptions(); + + if (subscriptionList) { + topicList[0].forEach(sub => { + this.subscriptions.push(PubSubTemplate.getName(sub.name)); + }); + } + + if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) { + await this.createTopic(this.requestTopic); + await this.createSubscription(this.requestTopic); + } + + const subscription = this.pubSubClient.subscription(this.requestTopic); + + const messageProcessor = new JsInvokeMessageProcessor(this); + + const messageHandler = (message: Message) => { + messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); + message.ack(); + }; + + subscription.on('message', messageHandler); } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { @@ -146,14 +141,7 @@ export class PubSubTemplate implements IQueue { } } - static async build(): Promise { - const queue = new PubSubTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number): Promise { - this.logger.info('Exiting with status: %d ...', status); + async destroy(): Promise { this.logger.info('Stopping Pub/Sub resources...'); if (this.pubSubClient) { this.logger.info('Stopping Pub/Sub client...'); @@ -168,7 +156,6 @@ export class PubSubTemplate implements IQueue { } } this.logger.info('Pub/Sub resources stopped.'); - process.exit(status); } } diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index 59ec68896d..a86dc8fd1d 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -15,7 +15,8 @@ /// export interface IQueue { + name: string; init(): Promise; send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; - destroy(status: number): Promise; + destroy(): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index 372024a4f3..9369f170f5 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -44,39 +44,35 @@ export class RabbitMqTemplate implements IQueue { private stopped = false; private topics: string[] = []; + name = 'RabbitMQ'; + constructor() { } async init(): Promise { - try { - const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; - this.connection = await amqp.connect(url); - this.channel = await this.connection.createConfirmChannel(); + const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; + this.connection = await amqp.connect(url); + this.channel = await this.connection.createConfirmChannel(); - this.parseQueueProperties(); + this.parseQueueProperties(); - await this.createQueue(this.requestTopic); + await this.createQueue(this.requestTopic); - const messageProcessor = new JsInvokeMessageProcessor(this); + const messageProcessor = new JsInvokeMessageProcessor(this); - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - let message = await this.channel.get(this.requestTopic); + while (!this.stopped) { + let pollStartTs = new Date().getTime(); + let message = await this.channel.get(this.requestTopic); - if (message) { - messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); - this.channel.ack(message); - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } + if (message) { + messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); + this.channel.ack(message); + } else { + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < this.pollInterval) { + await sleep(this.pollInterval - pollDuration); } } - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); } } @@ -112,14 +108,7 @@ export class RabbitMqTemplate implements IQueue { return this.channel.assertQueue(topic, this.queueOptions); } - static async build(): Promise { - const queue = new RabbitMqTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number) { - this.logger.info('Exiting with status: %d ...', status); + async destroy() { this.logger.info('Stopping RabbitMQ resources...'); if (this.channel) { @@ -144,7 +133,6 @@ export class RabbitMqTemplate implements IQueue { } } this.logger.info('RabbitMQ resources stopped.') - process.exit(status); } } diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index b2750672e5..76d87e8068 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -44,46 +44,42 @@ export class ServiceBusTemplate implements IQueue { private receiver: ServiceBusReceiver; private senderMap = new Map(); + name = 'Azure Service Bus'; + constructor() { } async init() { - try { - const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; - this.sbClient = new ServiceBusClient(connectionString) - this.serviceBusService = new ServiceBusAdministrationClient(connectionString); + const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; + this.sbClient = new ServiceBusClient(connectionString) + this.serviceBusService = new ServiceBusAdministrationClient(connectionString); - this.parseQueueProperties(); + this.parseQueueProperties(); - const listQueues = await this.serviceBusService.listQueues(); - for await (const queue of listQueues) { - this.queues.push(queue.name); - } - - if (!this.queues.includes(this.requestTopic)) { - await this.createQueueIfNotExist(this.requestTopic); - this.queues.push(this.requestTopic); - } - - this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'}); - - const messageProcessor = new JsInvokeMessageProcessor(this); - - const messageHandler = async (message: ServiceBusReceivedMessage) => { - if (message) { - messageProcessor.onJsInvokeMessage(message.body); - await this.receiver.completeMessage(message); - } - }; - const errorHandler = async (error: ProcessErrorArgs) => { - this.logger.error('Failed to receive message from queue.', error); - }; - this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); + const listQueues = await this.serviceBusService.listQueues(); + for await (const queue of listQueues) { + this.queues.push(queue.name); } + + if (!this.queues.includes(this.requestTopic)) { + await this.createQueueIfNotExist(this.requestTopic); + this.queues.push(this.requestTopic); + } + + this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'}); + + const messageProcessor = new JsInvokeMessageProcessor(this); + + const messageHandler = async (message: ServiceBusReceivedMessage) => { + if (message) { + messageProcessor.onJsInvokeMessage(message.body); + await this.receiver.completeMessage(message); + } + }; + const errorHandler = async (error: ProcessErrorArgs) => { + this.logger.error('Failed to receive message from queue.', error); + }; + this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { @@ -133,14 +129,7 @@ export class ServiceBusTemplate implements IQueue { } } - static async build(): Promise { - const queue = new ServiceBusTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number) { - this.logger.info('Exiting with status: %d ...', status); + async destroy() { this.logger.info('Stopping Azure Service Bus resources...') if (this.receiver) { this.logger.info('Stopping Service Bus Receiver...'); @@ -181,6 +170,5 @@ export class ServiceBusTemplate implements IQueue { } } this.logger.info('Azure Service Bus resources stopped.') - process.exit(status); } } diff --git a/msa/js-executor/server.ts b/msa/js-executor/server.ts index 708a87fec9..446f9d34aa 100644 --- a/msa/js-executor/server.ts +++ b/msa/js-executor/server.ts @@ -30,63 +30,66 @@ logger.info('===CONFIG BEGIN==='); logger.info(JSON.stringify(config, null, 4)); logger.info('===CONFIG END==='); -const serviceType = config.get('queue_type'); +const serviceType: string = config.get('queue_type'); const httpPort = Number(config.get('http_port')); let queues: IQueue | null; let httpServer: HttpServer | null; (async () => { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - switch (serviceType) { - case 'kafka': - logger.info('Starting Kafka template...'); - queues = await KafkaTemplate.build(); - logger.info('Kafka template started.'); - break; - case 'pubsub': - logger.info('Starting Pub/Sub template...') - queues = await PubSubTemplate.build(); - logger.info('Pub/Sub template started.') - break; - case 'aws-sqs': - logger.info('Starting AWS SQS template...') - queues = await AwsSqsTemplate.build(); - logger.info('AWS SQS template started.') - break; - case 'rabbitmq': - logger.info('Starting RabbitMQ template...') - queues = await RabbitMqTemplate.build(); - logger.info('RabbitMQ template started.') - break; - case 'service-bus': - logger.info('Starting Azure Service Bus template...') - queues = await ServiceBusTemplate.build(); - logger.info('Azure Service Bus template started.') - break; - default: - logger.error('Unknown service type: ', serviceType); - process.exit(-1); + try { + queues = await createQueue(serviceType); + logger.info(`Starting ${queues.name} template...`); + await queues.init(); + logger.info(`${queues.name} template started.`); + httpServer = new HttpServer(httpPort); + } catch (e: any) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + await exit(-1); } - httpServer = new HttpServer(httpPort); })(); +async function createQueue(serviceType: string): Promise { + switch (serviceType) { + case 'kafka': + return new KafkaTemplate(); + case 'pubsub': + return new PubSubTemplate(); + case 'aws-sqs': + return new AwsSqsTemplate(); + case 'rabbitmq': + return new RabbitMqTemplate(); + case 'service-bus': + return new ServiceBusTemplate(); + default: + throw new Error('Unknown service type: ' + serviceType); + } +} + [`SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => { process.on(eventType, async () => { logger.info(`${eventType} signal received`); - if (httpServer) { - const _httpServer = httpServer; - httpServer = null; - await _httpServer.stop(); - } - if (queues) { - const _queues = queues; - queues = null; - await _queues.destroy(0); - } + await exit(0); }) }) process.on('exit', (code: number) => { - logger.info(`JavaScript Executor Microservice has been stopped. Exit code: ${code}.`); + logger.info(`ThingsBoard JavaScript Executor Microservice has been stopped. Exit code: ${code}.`); }); + +async function exit(status: number) { + logger.info('Exiting with status: %d ...', status); + if (httpServer) { + const _httpServer = httpServer; + httpServer = null; + await _httpServer.stop(); + } + if (queues) { + const _queues = queues; + queues = null; + await _queues.destroy(); + } + process.exit(status); +} diff --git a/msa/js-executor/yarn.lock b/msa/js-executor/yarn.lock index ed6e7406f9..eb6443e53d 100644 --- a/msa/js-executor/yarn.lock +++ b/msa/js-executor/yarn.lock @@ -2670,10 +2670,10 @@ jws@^4.0.0: jwa "^2.0.0" safe-buffer "^5.0.1" -kafkajs@^2.0.2: - version "2.0.2" - resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.0.2.tgz#cdfc8f57aa4fd69f6d9ca1cce4ee89bbc2a3a1f9" - integrity sha512-g6CM3fAenofOjR1bfOAqeZUEaSGhNtBscNokybSdW1rmIKYNwBPC9xQzwulFJm36u/xcxXUiCl/L/qfslapihA== +kafkajs@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.1.0.tgz#32ede4e8080cc75586c5e4406eeb582fa73f7b1e" + integrity sha512-6IYiOdGWvFPbSbVB+AV3feT+A7vzw5sXm7Ze4QTfP7FRNdY8pGcpiNPvD2lfgYFD8Dm9KbMgBgTt2mf8KaIkzw== keyv@^3.0.0: version "3.1.0"