thingsboard/msa/js-executor/queue/kafkaTemplate.js
2021-06-01 16:20:40 +03:00

284 lines
10 KiB
JavaScript

/*
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const {logLevel, Kafka, CompressionTypes} = require('kafkajs');
const config = require('config'),
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
logger = require('../config/logger')._logger('kafkaTemplate'),
KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
const replicationFactor = Number(config.get('kafka.replication_factor'));
const topicProperties = config.get('kafka.topic_properties');
const kafkaClientId = config.get('kafka.client_id');
const acks = Number(config.get('kafka.acks'));
const requestTimeout = Number(config.get('kafka.requestTimeout'));
const compressionType = (config.get('kafka.requestTimeout') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None;
const linger = 5; //milliseconds //TODO move to the config
const maxBatchSize = 10; //max messages in batch //TODO move to the config
let kafkaClient;
let kafkaAdmin;
let consumer;
let producer;
const configEntries = [];
let batchMessages = [];
let sendLoopInstance;
function KafkaProducer() {
this.send = (responseTopic, scriptId, rawResponse, headers) => {
logger.debug('Pending queue response, scriptId: [%s]', scriptId);
const message = {
topic: responseTopic,
messages: [{
key: scriptId,
value: rawResponse,
headers: headers.data
}]
};
pushMessageToSendLater(message);
return {};
}
}
function pushMessageToSendLater(message) {
batchMessages.push(message);
if (batchMessages.length >= maxBatchSize) {
sendMessagesAsBatch();
sendLoopWithLinger(); //reset loop function and reschedule new linger
}
}
function sendLoopWithLinger() {
if (sendLoopInstance) {
logger.debug("Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger);
clearInterval(sendLoopInstance);
} else {
logger.debug("Starting new send loop with linger [%s]", linger)
}
sendLoopInstance = setInterval(sendMessagesAsBatch, linger);
}
function sendMessagesAsBatch() {
if (batchMessages.length > 0) {
logger.info('sendMessagesAsBatch, lenght: [%s]', batchMessages.length );
const messagesToSend = batchMessages;
batchMessages = [];
producer.sendBatch({
topicMessages: messagesToSend,
acks: acks,
compression: compressionType
}).then(
() => {
logger.info('Response sent to kafka, length: [%s]', messagesToSend.length );
},
(err) => {
logger.error('Failed to send kafka, length: [%s], pending to reprocess msgs', messagesToSend.length );
batchMessages = messagesToSend.concat(batchMessages);
logger.error(err.stack);
}
);
} else {
//logger.debug("nothing to send");
}
}
(async () => {
try {
logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
const kafkaBootstrapServers = config.get('kafka.bootstrap.servers');
const requestTopic = config.get('request_topic');
const useConfluent = config.get('kafka.use_confluent_cloud');
logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
logger.info('Kafka Requests Topic: %s', requestTopic);
let kafkaConfig = {
brokers: kafkaBootstrapServers.split(','),
logLevel: logLevel.INFO,
logCreator: KafkaJsWinstonLogCreator
};
if (kafkaClientId) {
kafkaConfig['clientId'] = kafkaClientId;
} else {
logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID');
}
kafkaConfig['requestTimeout'] = requestTimeout;
if (useConfluent) {
kafkaConfig['sasl'] = {
mechanism: config.get('kafka.confluent.sasl.mechanism'),
username: config.get('kafka.confluent.username'),
password: config.get('kafka.confluent.password')
};
kafkaConfig['ssl'] = true;
}
kafkaClient = new Kafka(kafkaConfig);
parseTopicProperties();
kafkaAdmin = kafkaClient.admin();
await kafkaAdmin.connect();
let partitions = 1;
for (let i = 0; i < configEntries.length; i++) {
let param = configEntries[i];
if (param.name === 'partitions') {
partitions = param.value;
configEntries.splice(i, 1);
break;
}
}
let topics = await kafkaAdmin.listTopics();
if (!topics.includes(requestTopic)) {
let createRequestTopicResult = await createTopic(requestTopic, partitions);
if (createRequestTopicResult) {
logger.info('Created new topic: %s', requestTopic);
}
}
consumer = kafkaClient.consumer({groupId: 'js-executor-group'});
producer = kafkaClient.producer();
const { CONNECT } = producer.events;
const removeListenerC = producer.on(CONNECT, e => logger.info(`producer CONNECT`));
const { DISCONNECT } = producer.events;
const removeListenerD = producer.on(DISCONNECT, e => logger.info(`producer DISCONNECT`));
const { REQUEST } = producer.events;
const removeListenerR = producer.on(REQUEST, e => logger.info(`producer REQUEST ${e.payload.broker}`));
const { REQUEST_TIMEOUT } = producer.events;
const removeListenerRT = producer.on(REQUEST_TIMEOUT, e => logger.info(`producer REQUEST_TIMEOUT ${e.payload.broker}`));
const { REQUEST_QUEUE_SIZE } = producer.events;
const removeListenerRQS = producer.on(REQUEST_QUEUE_SIZE, e => logger.info(`producer REQUEST_QUEUE_SIZE ${e.payload.broker} size ${e.queueSize}`));
// const removeListeners = {}
// const { FETCH_START } = consumer.events;
// removeListeners[FETCH_START] = consumer.on(FETCH_START, e => logger.info(`consumer FETCH_START`));
// const { FETCH } = consumer.events;
// removeListeners[FETCH] = consumer.on(FETCH, e => logger.info(`consumer FETCH numberOfBatches ${e.payload.numberOfBatches} duration ${e.payload.duration}`));
// const { START_BATCH_PROCESS } = consumer.events;
// removeListeners[START_BATCH_PROCESS] = consumer.on(START_BATCH_PROCESS, e => logger.info(`consumer START_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`));
// const { END_BATCH_PROCESS } = consumer.events;
// removeListeners[END_BATCH_PROCESS] = consumer.on(END_BATCH_PROCESS, e => logger.info(`consumer END_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`));
// const { COMMIT_OFFSETS } = consumer.events;
// removeListeners[COMMIT_OFFSETS] = consumer.on(COMMIT_OFFSETS, e => logger.info(`consumer COMMIT_OFFSETS topics ${e.payload.topics}`));
const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer());
await consumer.connect();
await producer.connect();
sendLoopWithLinger();
await consumer.subscribe({topic: requestTopic});
logger.info('Started ThingsBoard JavaScript Executor Microservice.');
await consumer.run({
//partitionsConsumedConcurrently: 1, // Default: 1
eachMessage: async ({topic, partition, message}) => {
let headers = message.headers;
let key = message.key;
let msg = {};
msg.key = key.toString('utf8');
msg.data = message.value;
msg.headers = {data: headers};
messageProcessor.onJsInvokeMessage(msg);
},
});
} catch (e) {
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
logger.error(e.stack);
exit(-1);
}
})();
function createTopic(topic, partitions) {
return kafkaAdmin.createTopics({
topics: [{
topic: topic,
numPartitions: partitions,
replicationFactor: replicationFactor,
configEntries: configEntries
}]
});
}
function parseTopicProperties() {
const props = topicProperties.split(';');
props.forEach(p => {
const delimiterPosition = p.indexOf(':');
configEntries.push({name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1)});
});
}
process.on('exit', () => {
exit(0);
});
async function exit(status) {
logger.info('Exiting with status: %d ...', status);
if (kafkaAdmin) {
logger.info('Stopping Kafka Admin...');
await kafkaAdmin.disconnect();
logger.info('Kafka Admin stopped.');
}
if (consumer) {
logger.info('Stopping Kafka Consumer...');
let _consumer = consumer;
consumer = null;
try {
await _consumer.disconnect();
logger.info('Kafka Consumer stopped.');
await disconnectProducer();
process.exit(status);
} catch (e) {
logger.info('Kafka Consumer stop error.');
await disconnectProducer();
process.exit(status);
}
} else {
process.exit(status);
}
}
async function disconnectProducer() {
if (producer) {
logger.info('Stopping Kafka Producer...');
var _producer = producer;
producer = null;
try {
logger.info('Stopping loop...');
//TODO: send handle msg
clearInterval(sendLoopInstance);
sendMessagesAsBatch();
await _producer.disconnect();
logger.info('Kafka Producer stopped.');
} catch (e) {
logger.info('Kafka Producer stop error.');
}
}
}