thingsboard/msa/js-executor/queue/kafkaTemplate.js
2020-04-28 16:13:16 +03:00

131 lines
4.3 KiB
JavaScript

/*
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const {logLevel, Kafka} = require('kafkajs');
const config = require('config'),
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
logger = require('../config/logger')._logger('kafkaTemplate'),
KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
let kafkaClient;
let consumer;
let producer;
function KafkaProducer() {
this.send = async (responseTopic, scriptId, rawResponse, headers) => {
let headersData = headers.data;
headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)]));
return producer.send(
{
topic: responseTopic,
messages: [
{
key: scriptId,
value: rawResponse,
headers: headersData
}
]
});
}
}
(async () => {
try {
logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
const kafkaBootstrapServers = config.get('kafka.bootstrap.servers');
const kafkaRequestTopic = config.get('request_topic');
logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
logger.info('Kafka Requests Topic: %s', kafkaRequestTopic);
kafkaClient = new Kafka({
brokers: kafkaBootstrapServers.split(','),
logLevel: logLevel.INFO,
logCreator: KafkaJsWinstonLogCreator
});
consumer = kafkaClient.consumer({groupId: 'js-executor-group'});
producer = kafkaClient.producer();
const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer());
await consumer.connect();
await producer.connect();
await consumer.subscribe({topic: kafkaRequestTopic});
logger.info('Started ThingsBoard JavaScript Executor Microservice.');
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
let headers = message.headers;
let key = message.key;
let data = message.value;
let msg = {};
headers = Object.fromEntries(
Object.entries(headers).map(([key, value]) => [key, [...value]]));
msg.key = key.toString('utf8');
msg.data = [...data];
msg.headers = {data: headers}
messageProcessor.onJsInvokeMessage(JSON.stringify(msg));
},
});
} catch (e) {
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
logger.error(e.stack);
exit(-1);
}
})();
process.on('exit', () => {
exit(0);
});
async function exit(status) {
logger.info('Exiting with status: %d ...', status);
if (consumer) {
logger.info('Stopping Kafka Consumer...');
let _consumer = consumer;
consumer = null;
try {
await _consumer.disconnect();
logger.info('Kafka Consumer stopped.');
await disconnectProducer();
process.exit(status);
} catch (e) {
logger.info('Kafka Consumer stop error.');
await disconnectProducer();
process.exit(status);
}
} else {
process.exit(status);
}
}
async function disconnectProducer() {
if (producer) {
logger.info('Stopping Kafka Producer...');
var _producer = producer;
producer = null;
try {
await _producer.disconnect();
logger.info('Kafka Producer stopped.');
} catch (e) {
logger.info('Kafka Producer stop error.');
}
}
}