Merge branch 'vvlladd28-bugs/js-executor/greceful-shutdows' into develop/3.4

This commit is contained in:
Igor Kulikov 2022-07-07 19:38:01 +03:00
commit 74bb6d77bf
11 changed files with 369 additions and 395 deletions

View File

@ -16,12 +16,15 @@
import express from 'express';
import { _logger} from '../config/logger';
import http from 'http';
import { Socket } from 'net';
export class HttpServer {
private logger = _logger('httpServer');
private app = express();
private server;
private server: http.Server | null;
private connections: Socket[] = [];
constructor(httpPort: number) {
this.app.get('/livenessProbe', async (req, res) => {
@ -32,15 +35,31 @@ export class HttpServer {
})
this.server = this.app.listen(httpPort, () => {
this.logger.info('Started http endpoint on port %s. Please, use /livenessProbe !', httpPort);
this.logger.info('Started HTTP endpoint on port %s. Please, use /livenessProbe !', httpPort);
}).on('error', (error) => {
this.logger.error(error);
});
}
stop() {
this.server.close(() => {
this.logger.info('Http server stop');
this.server.on('connection', connection => {
this.connections.push(connection);
connection.on('close', () => this.connections = this.connections.filter(curr => curr !== connection));
});
}
async stop() {
if (this.server) {
this.logger.info('Stopping HTTP Server...');
const _server = this.server;
this.server = null;
this.connections.forEach(curr => curr.end(() => curr.destroy()));
await new Promise<void>(
(resolve, reject) => {
_server.close((err) => {
this.logger.info('HTTP Server stopped.');
resolve();
});
}
);
}
}
}

View File

@ -29,6 +29,7 @@ COPY package/linux/conf ./conf
COPY package/linux/conf ./config
COPY src/api ./api
COPY src/queue ./queue
COPY src/config ./config
COPY src/server.js ./
RUN chmod a+x /tmp/*.sh \

View File

@ -20,7 +20,7 @@
"config": "^3.3.7",
"express": "^4.18.1",
"js-yaml": "^4.1.0",
"kafkajs": "^2.0.2",
"kafkajs": "^2.1.0",
"long": "^5.2.0",
"uuid-parse": "^1.1.0",
"uuid-random": "^1.3.2",

View File

@ -54,82 +54,76 @@ export class AwsSqsTemplate implements IQueue {
FifoQueue: 'true'
};
name = 'AWS SQS';
constructor() {
}
async init() {
try {
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
this.sqsClient = new SQSClient({
apiVersion: '2012-11-05',
credentials: {
accessKeyId: this.accessKeyId,
secretAccessKey: this.secretAccessKey
},
region: this.region
});
this.sqsClient = new SQSClient({
apiVersion: '2012-11-05',
credentials: {
accessKeyId: this.accessKeyId,
secretAccessKey: this.secretAccessKey
},
region: this.region
const queues = await this.getQueues();
if (queues.QueueUrls) {
queues.QueueUrls.forEach(queueUrl => {
const delimiterPosition = queueUrl.lastIndexOf('/');
const queueName = queueUrl.substring(delimiterPosition + 1);
this.queueUrls.set(queueName, queueUrl);
});
}
const queues = await this.getQueues();
this.parseQueueProperties();
if (queues.QueueUrls) {
queues.QueueUrls.forEach(queueUrl => {
const delimiterPosition = queueUrl.lastIndexOf('/');
const queueName = queueUrl.substring(delimiterPosition + 1);
this.queueUrls.set(queueName, queueUrl);
});
}
this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || '';
if (!this.requestQueueURL) {
this.requestQueueURL = await this.createQueue(this.requestTopic);
}
this.parseQueueProperties();
const messageProcessor = new JsInvokeMessageProcessor(this);
this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || '';
if (!this.requestQueueURL) {
this.requestQueueURL = await this.createQueue(this.requestTopic);
}
const params: ReceiveMessageRequest = {
MaxNumberOfMessages: 10,
QueueUrl: this.requestQueueURL,
WaitTimeSeconds: this.pollInterval / 1000
};
while (!this.stopped) {
let pollStartTs = new Date().getTime();
const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params));
const messages = messagesResponse.Messages;
const messageProcessor = new JsInvokeMessageProcessor(this);
if (messages && messages.length > 0) {
const entries: DeleteMessageBatchRequestEntry[] = [];
const params: ReceiveMessageRequest = {
MaxNumberOfMessages: 10,
QueueUrl: this.requestQueueURL,
WaitTimeSeconds: this.pollInterval / 1000
};
while (!this.stopped) {
let pollStartTs = new Date().getTime();
const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params));
const messages = messagesResponse.Messages;
if (messages && messages.length > 0) {
const entries: DeleteMessageBatchRequestEntry[] = [];
messages.forEach(message => {
entries.push({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
});
messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || ''));
messages.forEach(message => {
entries.push({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
});
messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || ''));
});
const deleteBatch: DeleteMessageBatchRequest = {
QueueUrl: this.requestQueueURL,
Entries: entries
};
try {
await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch))
} catch (err: any) {
this.logger.error("Failed to delete messages from queue.", err.message);
}
} else {
let pollDuration = new Date().getTime() - pollStartTs;
if (pollDuration < this.pollInterval) {
await sleep(this.pollInterval - pollDuration);
}
const deleteBatch: DeleteMessageBatchRequest = {
QueueUrl: this.requestQueueURL,
Entries: entries
};
try {
await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch))
} catch (err: any) {
this.logger.error("Failed to delete messages from queue.", err.message);
}
} else {
let pollDuration = new Date().getTime() - pollStartTs;
if (pollDuration < this.pollInterval) {
await sleep(this.pollInterval - pollDuration);
}
}
} catch (e: any) {
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
this.logger.error(e.stack);
await this.exit(-1);
}
}
@ -187,29 +181,21 @@ export class AwsSqsTemplate implements IQueue {
return result.QueueUrl || '';
}
static async build(): Promise<AwsSqsTemplate> {
const queue = new AwsSqsTemplate();
await queue.init();
return queue;
}
async exit(status: number) {
async destroy(): Promise<void> {
this.stopped = true;
this.logger.info('Exiting with status: %d ...', status);
this.logger.info('Stopping AWS SQS resources...');
if (this.sqsClient) {
this.logger.info('Stopping Aws Sqs client.')
this.logger.info('Stopping AWS SQS client...');
try {
this.sqsClient.destroy();
const _sqsClient = this.sqsClient;
// @ts-ignore
delete this.sqsClient;
this.logger.info('Aws Sqs client stopped.')
process.exit(status);
_sqsClient.destroy();
this.logger.info('AWS SQS client stopped.');
} catch (e: any) {
this.logger.info('Aws Sqs client stop error.');
process.exit(status);
this.logger.info('AWS SQS client stop error.');
}
} else {
process.exit(status);
}
this.logger.info('AWS SQS resources stopped.')
}
}

View File

@ -51,111 +51,103 @@ export class KafkaTemplate implements IQueue {
private batchMessages: TopicMessages[] = [];
private sendLoopInstance: NodeJS.Timeout;
name = 'Kafka';
constructor() {
}
async init(): Promise<void> {
try {
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers');
const requestTopic: string = config.get('request_topic');
const useConfluent = config.get('kafka.use_confluent_cloud');
const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers');
const requestTopic: string = config.get('request_topic');
const useConfluent = config.get('kafka.use_confluent_cloud');
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
this.logger.info('Kafka Requests Topic: %s', requestTopic);
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
this.logger.info('Kafka Requests Topic: %s', requestTopic);
let kafkaConfig: KafkaConfig = {
brokers: kafkaBootstrapServers.split(','),
logLevel: logLevel.INFO,
logCreator: KafkaJsWinstonLogCreator
};
let kafkaConfig: KafkaConfig = {
brokers: kafkaBootstrapServers.split(','),
logLevel: logLevel.INFO,
logCreator: KafkaJsWinstonLogCreator
};
if (this.kafkaClientId) {
kafkaConfig['clientId'] = this.kafkaClientId;
} else {
this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID');
}
kafkaConfig['requestTimeout'] = this.requestTimeout;
if (useConfluent) {
kafkaConfig['sasl'] = {
mechanism: config.get('kafka.confluent.sasl.mechanism') as any,
username: config.get('kafka.confluent.username'),
password: config.get('kafka.confluent.password')
};
kafkaConfig['ssl'] = true;
}
this.parseTopicProperties();
this.kafkaClient = new Kafka(kafkaConfig);
this.kafkaAdmin = this.kafkaClient.admin();
await this.kafkaAdmin.connect();
let partitions = 1;
for (let i = 0; i < this.configEntries.length; i++) {
let param = this.configEntries[i];
if (param.name === 'partitions') {
partitions = param.value;
this.configEntries.splice(i, 1);
break;
}
}
let topics = await this.kafkaAdmin.listTopics();
if (!topics.includes(requestTopic)) {
let createRequestTopicResult = await this.createTopic(requestTopic, partitions);
if (createRequestTopicResult) {
this.logger.info('Created new topic: %s', requestTopic);
}
}
this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'});
this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner});
const {CRASH} = this.consumer.events;
this.consumer.on(CRASH, e => {
this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`);
if (!e.payload.restart) {
this.logger.error('Going to exit due to not retryable error!');
this.exit(-1);
}
});
const messageProcessor = new JsInvokeMessageProcessor(this);
await this.consumer.connect();
await this.producer.connect();
this.sendLoopWithLinger();
await this.consumer.subscribe({topic: requestTopic});
this.logger.info('Started ThingsBoard JavaScript Executor Microservice.');
await this.consumer.run({
partitionsConsumedConcurrently: this.partitionsConsumedConcurrently,
eachMessage: async ({topic, partition, message}) => {
let headers = message.headers;
let key = message.key || new Buffer([]);
let msg = {
key: key.toString('utf8'),
data: message.value,
headers: {
data: headers
}
};
messageProcessor.onJsInvokeMessage(msg);
},
});
} catch (e: any) {
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
this.logger.error(e.stack);
await this.exit(-1);
if (this.kafkaClientId) {
kafkaConfig['clientId'] = this.kafkaClientId;
} else {
this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID');
}
}
kafkaConfig['requestTimeout'] = this.requestTimeout;
if (useConfluent) {
kafkaConfig['sasl'] = {
mechanism: config.get('kafka.confluent.sasl.mechanism') as any,
username: config.get('kafka.confluent.username'),
password: config.get('kafka.confluent.password')
};
kafkaConfig['ssl'] = true;
}
this.parseTopicProperties();
this.kafkaClient = new Kafka(kafkaConfig);
this.kafkaAdmin = this.kafkaClient.admin();
await this.kafkaAdmin.connect();
let partitions = 1;
for (let i = 0; i < this.configEntries.length; i++) {
let param = this.configEntries[i];
if (param.name === 'partitions') {
partitions = param.value;
this.configEntries.splice(i, 1);
break;
}
}
let topics = await this.kafkaAdmin.listTopics();
if (!topics.includes(requestTopic)) {
let createRequestTopicResult = await this.createTopic(requestTopic, partitions);
if (createRequestTopicResult) {
this.logger.info('Created new topic: %s', requestTopic);
}
}
this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'});
this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner});
const {CRASH} = this.consumer.events;
this.consumer.on(CRASH, async (e) => {
this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`);
if (!e.payload.restart) {
this.logger.error('Going to exit due to not retryable error!');
await this.destroy();
}
});
const messageProcessor = new JsInvokeMessageProcessor(this);
await this.consumer.connect();
await this.producer.connect();
this.sendLoopWithLinger();
await this.consumer.subscribe({topic: requestTopic});
await this.consumer.run({
partitionsConsumedConcurrently: this.partitionsConsumedConcurrently,
eachMessage: async ({topic, partition, message}) => {
let headers = message.headers;
let key = message.key || new Buffer([]);
let msg = {
key: key.toString('utf8'),
data: message.value,
headers: {
data: headers
}
};
messageProcessor.onJsInvokeMessage(msg);
},
});
}
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
this.logger.debug('Pending queue response, scriptId: [%s]', scriptId);
@ -235,41 +227,33 @@ export class KafkaTemplate implements IQueue {
}, this.linger);
}
static async build(): Promise<KafkaTemplate> {
const queue = new KafkaTemplate();
await queue.init();
return queue;
}
async exit(status: number): Promise<void> {
this.logger.info('Exiting with status: %d ...', status);
async destroy(): Promise<void> {
this.logger.info('Stopping Kafka resources...');
if (this.kafkaAdmin) {
this.logger.info('Stopping Kafka Admin...');
await this.kafkaAdmin.disconnect();
const _kafkaAdmin = this.kafkaAdmin;
// @ts-ignore
delete this.kafkaAdmin;
await _kafkaAdmin.disconnect();
this.logger.info('Kafka Admin stopped.');
}
if (this.consumer) {
this.logger.info('Stopping Kafka Consumer...');
try {
await this.consumer.disconnect();
const _consumer = this.consumer;
// @ts-ignore
delete this.consumer;
await _consumer.disconnect();
this.logger.info('Kafka Consumer stopped.');
await this.disconnectProducer();
process.exit(status);
} catch (e: any) {
this.logger.info('Kafka Consumer stop error.');
await this.disconnectProducer();
process.exit(status);
}
} else {
process.exit(status);
}
this.logger.info('Kafka resources stopped.');
}
private async disconnectProducer(): Promise<void> {
@ -279,13 +263,15 @@ export class KafkaTemplate implements IQueue {
this.logger.info('Stopping loop...');
clearTimeout(this.sendLoopInstance);
await this.sendMessagesAsBatch();
await this.producer.disconnect();
const _producer = this.producer;
// @ts-ignore
delete this.producer;
await _producer.disconnect();
this.logger.info('Kafka Producer stopped.');
} catch (e) {
this.logger.info('Kafka Producer stop error.');
}
}
}
}

View File

@ -34,56 +34,50 @@ export class PubSubTemplate implements IQueue {
private topics: string[] = [];
private subscriptions: string[] = [];
name = 'Pub/Sub';
constructor() {
}
async init() {
try {
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
this.pubSubClient = new PubSub({
projectId: this.projectId,
credentials: this.credentials
this.pubSubClient = new PubSub({
projectId: this.projectId,
credentials: this.credentials
});
this.parseQueueProperties();
const topicList = await this.pubSubClient.getTopics();
if (topicList) {
topicList[0].forEach(topic => {
this.topics.push(PubSubTemplate.getName(topic.name));
});
this.parseQueueProperties();
const topicList = await this.pubSubClient.getTopics();
if (topicList) {
topicList[0].forEach(topic => {
this.topics.push(PubSubTemplate.getName(topic.name));
});
}
const subscriptionList = await this.pubSubClient.getSubscriptions();
if (subscriptionList) {
topicList[0].forEach(sub => {
this.subscriptions.push(PubSubTemplate.getName(sub.name));
});
}
if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) {
await this.createTopic(this.requestTopic);
await this.createSubscription(this.requestTopic);
}
const subscription = this.pubSubClient.subscription(this.requestTopic);
const messageProcessor = new JsInvokeMessageProcessor(this);
const messageHandler = (message: Message) => {
messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8')));
message.ack();
};
subscription.on('message', messageHandler);
} catch (e: any) {
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
this.logger.error(e.stack);
await this.exit(-1);
}
const subscriptionList = await this.pubSubClient.getSubscriptions();
if (subscriptionList) {
topicList[0].forEach(sub => {
this.subscriptions.push(PubSubTemplate.getName(sub.name));
});
}
if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) {
await this.createTopic(this.requestTopic);
await this.createSubscription(this.requestTopic);
}
const subscription = this.pubSubClient.subscription(this.requestTopic);
const messageProcessor = new JsInvokeMessageProcessor(this);
const messageHandler = (message: Message) => {
messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8')));
message.ack();
};
subscription.on('message', messageHandler);
}
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
@ -147,29 +141,21 @@ export class PubSubTemplate implements IQueue {
}
}
static async build(): Promise<PubSubTemplate> {
const queue = new PubSubTemplate();
await queue.init();
return queue;
}
async exit(status: number): Promise<void> {
this.logger.info('Exiting with status: %d ...', status);
async destroy(): Promise<void> {
this.logger.info('Stopping Pub/Sub resources...');
if (this.pubSubClient) {
this.logger.info('Stopping Pub/Sub client.')
this.logger.info('Stopping Pub/Sub client...');
try {
await this.pubSubClient.close();
const _pubSubClient = this.pubSubClient;
// @ts-ignore
delete this.pubSubClient;
this.logger.info('Pub/Sub client stopped.')
process.exit(status);
await _pubSubClient.close();
this.logger.info('Pub/Sub client stopped.');
} catch (e) {
this.logger.info('Pub/Sub client stop error.');
process.exit(status);
}
} else {
process.exit(status);
}
this.logger.info('Pub/Sub resources stopped.');
}
}

View File

@ -15,7 +15,8 @@
///
export interface IQueue {
name: string;
init(): Promise<void>;
send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any>;
exit(status: number): Promise<void>;
destroy(): Promise<void>;
}

View File

@ -44,41 +44,35 @@ export class RabbitMqTemplate implements IQueue {
private stopped = false;
private topics: string[] = [];
name = 'RabbitMQ';
constructor() {
}
async init(): Promise<void> {
try {
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`;
this.connection = await amqp.connect(url);
this.channel = await this.connection.createConfirmChannel();
const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`;
this.connection = await amqp.connect(url);
this.channel = await this.connection.createConfirmChannel();
this.parseQueueProperties();
this.parseQueueProperties();
await this.createQueue(this.requestTopic);
await this.createQueue(this.requestTopic);
const messageProcessor = new JsInvokeMessageProcessor(this);
const messageProcessor = new JsInvokeMessageProcessor(this);
while (!this.stopped) {
let pollStartTs = new Date().getTime();
let message = await this.channel.get(this.requestTopic);
while (!this.stopped) {
let pollStartTs = new Date().getTime();
let message = await this.channel.get(this.requestTopic);
if (message) {
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
this.channel.ack(message);
} else {
let pollDuration = new Date().getTime() - pollStartTs;
if (pollDuration < this.pollInterval) {
await sleep(this.pollInterval - pollDuration);
}
if (message) {
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
this.channel.ack(message);
} else {
let pollDuration = new Date().getTime() - pollStartTs;
if (pollDuration < this.pollInterval) {
await sleep(this.pollInterval - pollDuration);
}
}
} catch (e: any) {
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
this.logger.error(e.stack);
await this.exit(-1);
}
}
@ -114,38 +108,31 @@ export class RabbitMqTemplate implements IQueue {
return this.channel.assertQueue(topic, this.queueOptions);
}
static async build(): Promise<RabbitMqTemplate> {
const queue = new RabbitMqTemplate();
await queue.init();
return queue;
}
async exit(status: number) {
this.logger.info('Exiting with status: %d ...', status);
async destroy() {
this.logger.info('Stopping RabbitMQ resources...');
if (this.channel) {
this.logger.info('Stopping RabbitMq chanel.')
await this.channel.close();
this.logger.info('Stopping RabbitMQ chanel...');
const _channel = this.channel;
// @ts-ignore
delete this.channel;
this.logger.info('RabbitMq chanel stopped');
await _channel.close();
this.logger.info('RabbitMQ chanel stopped');
}
if (this.connection) {
this.logger.info('Stopping RabbitMq connection.')
this.logger.info('Stopping RabbitMQ connection...')
try {
await this.connection.close();
const _connection = this.connection;
// @ts-ignore
delete this.connection;
this.logger.info('RabbitMq client connection.')
process.exit(status);
await _connection.close();
this.logger.info('RabbitMQ client connection.');
} catch (e) {
this.logger.info('RabbitMq connection stop error.');
process.exit(status);
this.logger.info('RabbitMQ connection stop error.');
}
} else {
process.exit(status);
}
this.logger.info('RabbitMQ resources stopped.')
}
}

View File

@ -44,48 +44,42 @@ export class ServiceBusTemplate implements IQueue {
private receiver: ServiceBusReceiver;
private senderMap = new Map<string, ServiceBusSender>();
name = 'Azure Service Bus';
constructor() {
}
async init() {
try {
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`;
this.sbClient = new ServiceBusClient(connectionString)
this.serviceBusService = new ServiceBusAdministrationClient(connectionString);
const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`;
this.sbClient = new ServiceBusClient(connectionString)
this.serviceBusService = new ServiceBusAdministrationClient(connectionString);
this.parseQueueProperties();
this.parseQueueProperties();
const listQueues = await this.serviceBusService.listQueues();
for await (const queue of listQueues) {
this.queues.push(queue.name);
}
if (!this.queues.includes(this.requestTopic)) {
await this.createQueueIfNotExist(this.requestTopic);
this.queues.push(this.requestTopic);
}
this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'});
const messageProcessor = new JsInvokeMessageProcessor(this);
const messageHandler = async (message: ServiceBusReceivedMessage) => {
if (message) {
messageProcessor.onJsInvokeMessage(message.body);
await this.receiver.completeMessage(message);
}
};
const errorHandler = async (error: ProcessErrorArgs) => {
this.logger.error('Failed to receive message from queue.', error);
};
this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
} catch (e: any) {
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
this.logger.error(e.stack);
await this.exit(-1);
const listQueues = await this.serviceBusService.listQueues();
for await (const queue of listQueues) {
this.queues.push(queue.name);
}
if (!this.queues.includes(this.requestTopic)) {
await this.createQueueIfNotExist(this.requestTopic);
this.queues.push(this.requestTopic);
}
this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'});
const messageProcessor = new JsInvokeMessageProcessor(this);
const messageHandler = async (message: ServiceBusReceivedMessage) => {
if (message) {
messageProcessor.onJsInvokeMessage(message.body);
await this.receiver.completeMessage(message);
}
};
const errorHandler = async (error: ProcessErrorArgs) => {
this.logger.error('Failed to receive message from queue.', error);
};
this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
}
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
@ -135,41 +129,46 @@ export class ServiceBusTemplate implements IQueue {
}
}
static async build(): Promise<ServiceBusTemplate> {
const queue = new ServiceBusTemplate();
await queue.init();
return queue;
}
async exit(status: number) {
this.logger.info('Exiting with status: %d ...', status);
async destroy() {
this.logger.info('Stopping Azure Service Bus resources...')
if (this.receiver) {
this.logger.info('Stopping Service Bus Receiver...');
try {
await this.receiver.close();
const _receiver = this.receiver;
// @ts-ignore
delete this.receiver;
await _receiver.close();
this.logger.info('Service Bus Receiver stopped.');
} catch (e) {
this.logger.info('Service Bus Receiver stop error.');
}
}
this.senderMap.forEach(k => {
try {
k.close();
} catch (e) {
}
this.logger.info('Stopping Service Bus Senders...');
const senders: Promise<void>[] = [];
this.senderMap.forEach((sender) => {
senders.push(sender.close());
});
this.senderMap.clear();
try {
await Promise.all(senders);
this.logger.info('Service Bus Senders stopped.');
} catch (e) {
this.logger.info('Service Bus Senders stop error.');
}
if (this.sbClient) {
this.logger.info('Stopping Service Bus Client...');
try {
await this.sbClient.close();
const _sbClient = this.sbClient;
// @ts-ignore
delete this.sbClient;
await _sbClient.close();
this.logger.info('Service Bus Client stopped.');
} catch (e) {
this.logger.info('Service Bus Client stop error.');
}
}
this.logger.info('Azure Service Bus resources stopped.')
process.exit(status);
}
}

View File

@ -30,57 +30,66 @@ logger.info('===CONFIG BEGIN===');
logger.info(JSON.stringify(config, null, 4));
logger.info('===CONFIG END===');
const serviceType = config.get('queue_type');
const serviceType: string = config.get('queue_type');
const httpPort = Number(config.get('http_port'));
let queues: IQueue;
let httpServer: HttpServer;
let queues: IQueue | null;
let httpServer: HttpServer | null;
(async () => {
switch (serviceType) {
case 'kafka':
logger.info('Starting kafka template.');
queues = await KafkaTemplate.build();
logger.info('kafka template started.');
break;
case 'pubsub':
logger.info('Starting Pub/Sub template.')
queues = await PubSubTemplate.build();
logger.info('Pub/Sub template started.')
break;
case 'aws-sqs':
logger.info('Starting Aws Sqs template.')
queues = await AwsSqsTemplate.build();
logger.info('Aws Sqs template started.')
break;
case 'rabbitmq':
logger.info('Starting RabbitMq template.')
queues = await RabbitMqTemplate.build();
logger.info('RabbitMq template started.')
break;
case 'service-bus':
logger.info('Starting Azure Service Bus template.')
queues = await ServiceBusTemplate.build();
logger.info('Azure Service Bus template started.')
break;
default:
logger.error('Unknown service type: ', serviceType);
process.exit(-1);
logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
try {
queues = await createQueue(serviceType);
logger.info(`Starting ${queues.name} template...`);
await queues.init();
logger.info(`${queues.name} template started.`);
httpServer = new HttpServer(httpPort);
} catch (e: any) {
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
logger.error(e.stack);
await exit(-1);
}
httpServer = new HttpServer(httpPort);
})();
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received');
process.exit(0);
async function createQueue(serviceType: string): Promise<IQueue> {
switch (serviceType) {
case 'kafka':
return new KafkaTemplate();
case 'pubsub':
return new PubSubTemplate();
case 'aws-sqs':
return new AwsSqsTemplate();
case 'rabbitmq':
return new RabbitMqTemplate();
case 'service-bus':
return new ServiceBusTemplate();
default:
throw new Error('Unknown service type: ' + serviceType);
}
}
[`SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => {
process.on(eventType, async () => {
logger.info(`${eventType} signal received`);
await exit(0);
})
})
process.on('exit', (code: number) => {
logger.info(`ThingsBoard JavaScript Executor Microservice has been stopped. Exit code: ${code}.`);
});
process.on('exit', async () => {
async function exit(status: number) {
logger.info('Exiting with status: %d ...', status);
if (httpServer) {
httpServer.stop();
const _httpServer = httpServer;
httpServer = null;
await _httpServer.stop();
}
if (queues) {
queues.exit(0);
const _queues = queues;
queues = null;
await _queues.destroy();
}
logger.info('JavaScript Executor Microservice has been stopped.');
});
process.exit(status);
}

View File

@ -2670,10 +2670,10 @@ jws@^4.0.0:
jwa "^2.0.0"
safe-buffer "^5.0.1"
kafkajs@^2.0.2:
version "2.0.2"
resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.0.2.tgz#cdfc8f57aa4fd69f6d9ca1cce4ee89bbc2a3a1f9"
integrity sha512-g6CM3fAenofOjR1bfOAqeZUEaSGhNtBscNokybSdW1rmIKYNwBPC9xQzwulFJm36u/xcxXUiCl/L/qfslapihA==
kafkajs@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.1.0.tgz#32ede4e8080cc75586c5e4406eeb582fa73f7b1e"
integrity sha512-6IYiOdGWvFPbSbVB+AV3feT+A7vzw5sXm7Ze4QTfP7FRNdY8pGcpiNPvD2lfgYFD8Dm9KbMgBgTt2mf8KaIkzw==
keyv@^3.0.0:
version "3.1.0"