thingsboard/msa/js-executor/queue/kafkaTemplate.js

297 lines
11 KiB
JavaScript
Raw Normal View History

/*
2022-01-17 14:07:46 +02:00
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const {logLevel, Kafka, CompressionTypes, Partitioners} = require('kafkajs');
const config = require('config'),
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
logger = require('../config/logger')._logger('kafkaTemplate'),
KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
const replicationFactor = Number(config.get('kafka.replication_factor'));
2020-04-30 16:24:12 +03:00
const topicProperties = config.get('kafka.topic_properties');
const kafkaClientId = config.get('kafka.client_id');
const acks = Number(config.get('kafka.acks'));
const maxBatchSize = Number(config.get('kafka.batch_size'));
const linger = Number(config.get('kafka.linger_ms'));
const requestTimeout = Number(config.get('kafka.requestTimeout'));
const compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None;
const partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently'));
let kafkaClient;
let kafkaAdmin;
let consumer;
let producer;
const configEntries = [];
2021-05-27 18:45:29 +03:00
let batchMessages = [];
let sendLoopInstance;
2021-05-27 14:55:10 +03:00
function KafkaProducer() {
this.send = async (responseTopic, scriptId, rawResponse, headers) => {
logger.debug('Pending queue response, scriptId: [%s]', scriptId);
2021-05-27 14:55:10 +03:00
const message = {
topic: responseTopic,
messages: [{
2021-06-01 18:06:10 +03:00
key: scriptId,
value: rawResponse,
headers: headers.data
2021-05-27 14:55:10 +03:00
}]
};
await pushMessageToSendLater(message);
}
}
async function pushMessageToSendLater(message) {
2021-05-27 18:45:29 +03:00
batchMessages.push(message);
if (batchMessages.length >= maxBatchSize) {
await sendMessagesAsBatch(true);
2021-05-27 18:45:29 +03:00
}
}
function sendLoopWithLinger() {
2021-06-01 18:06:10 +03:00
if (sendLoopInstance) {
clearTimeout(sendLoopInstance);
2022-07-06 19:08:06 +03:00
// } else {
// logger.debug("Starting new send loop with linger [%s]", linger)
2021-06-01 18:06:10 +03:00
}
sendLoopInstance = setTimeout(sendMessagesAsBatch, linger);
2021-05-27 14:55:10 +03:00
}
async function sendMessagesAsBatch(isImmediately) {
if (sendLoopInstance) {
2022-07-06 19:08:06 +03:00
// logger.debug("sendMessagesAsBatch: Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger);
clearTimeout(sendLoopInstance);
}
sendLoopInstance = null;
2021-06-01 18:06:10 +03:00
if (batchMessages.length > 0) {
logger.debug('sendMessagesAsBatch, length: [%s], %s', batchMessages.length, isImmediately ? 'immediately' : '');
2021-06-01 18:06:10 +03:00
const messagesToSend = batchMessages;
batchMessages = [];
try {
await producer.sendBatch({
topicMessages: messagesToSend,
acks: acks,
compression: compressionType
})
logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length);
} catch(err) {
logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length);
logger.error(err.stack);
batchMessages = messagesToSend.concat(batchMessages);
}
2021-06-01 18:06:10 +03:00
}
sendLoopWithLinger();
2021-05-27 14:55:10 +03:00
}
(async () => {
try {
logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
const kafkaBootstrapServers = config.get('kafka.bootstrap.servers');
const requestTopic = config.get('request_topic');
const useConfluent = config.get('kafka.use_confluent_cloud');
logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
logger.info('Kafka Requests Topic: %s', requestTopic);
let kafkaConfig = {
brokers: kafkaBootstrapServers.split(','),
2021-06-01 18:06:10 +03:00
logLevel: logLevel.INFO,
logCreator: KafkaJsWinstonLogCreator
};
if (kafkaClientId) {
kafkaConfig['clientId'] = kafkaClientId;
} else {
logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID');
}
kafkaConfig['requestTimeout'] = requestTimeout;
if (useConfluent) {
kafkaConfig['sasl'] = {
mechanism: config.get('kafka.confluent.sasl.mechanism'),
username: config.get('kafka.confluent.username'),
password: config.get('kafka.confluent.password')
};
kafkaConfig['ssl'] = true;
}
kafkaClient = new Kafka(kafkaConfig);
parseTopicProperties();
kafkaAdmin = kafkaClient.admin();
await kafkaAdmin.connect();
2020-09-02 18:12:08 +03:00
let partitions = 1;
for (let i = 0; i < configEntries.length; i++) {
let param = configEntries[i];
if (param.name === 'partitions') {
partitions = param.value;
configEntries.splice(i, 1);
break;
}
}
let topics = await kafkaAdmin.listTopics();
if (!topics.includes(requestTopic)) {
let createRequestTopicResult = await createTopic(requestTopic, partitions);
if (createRequestTopicResult) {
logger.info('Created new topic: %s', requestTopic);
}
}
consumer = kafkaClient.consumer({groupId: 'js-executor-group'});
producer = kafkaClient.producer({ createPartitioner: Partitioners.DefaultPartitioner });
/*
//producer event instrumentation to debug
const { CONNECT } = producer.events;
const removeListenerC = producer.on(CONNECT, e => logger.info(`producer CONNECT`));
const { DISCONNECT } = producer.events;
const removeListenerD = producer.on(DISCONNECT, e => logger.info(`producer DISCONNECT`));
const { REQUEST } = producer.events;
const removeListenerR = producer.on(REQUEST, e => logger.info(`producer REQUEST ${e.payload.broker}`));
const { REQUEST_TIMEOUT } = producer.events;
const removeListenerRT = producer.on(REQUEST_TIMEOUT, e => logger.info(`producer REQUEST_TIMEOUT ${e.payload.broker}`));
const { REQUEST_QUEUE_SIZE } = producer.events;
const removeListenerRQS = producer.on(REQUEST_QUEUE_SIZE, e => logger.info(`producer REQUEST_QUEUE_SIZE ${e.payload.broker} size ${e.queueSize}`));
*/
/*
//consumer event instrumentation to debug
const removeListeners = {}
const { FETCH_START } = consumer.events;
removeListeners[FETCH_START] = consumer.on(FETCH_START, e => logger.info(`consumer FETCH_START`));
const { FETCH } = consumer.events;
removeListeners[FETCH] = consumer.on(FETCH, e => logger.info(`consumer FETCH numberOfBatches ${e.payload.numberOfBatches} duration ${e.payload.duration}`));
const { START_BATCH_PROCESS } = consumer.events;
removeListeners[START_BATCH_PROCESS] = consumer.on(START_BATCH_PROCESS, e => logger.info(`consumer START_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`));
const { END_BATCH_PROCESS } = consumer.events;
removeListeners[END_BATCH_PROCESS] = consumer.on(END_BATCH_PROCESS, e => logger.info(`consumer END_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`));
const { COMMIT_OFFSETS } = consumer.events;
removeListeners[COMMIT_OFFSETS] = consumer.on(COMMIT_OFFSETS, e => logger.info(`consumer COMMIT_OFFSETS topics ${e.payload.topics}`));
*/
const { CRASH } = consumer.events;
consumer.on(CRASH, e => {
logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`);
if (!e.payload.restart) {
logger.error('Going to exit due to not retryable error!');
exit(-1);
}
});
const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer());
await consumer.connect();
await producer.connect();
2021-05-27 18:45:29 +03:00
sendLoopWithLinger();
await consumer.subscribe({topic: requestTopic});
logger.info('Started ThingsBoard JavaScript Executor Microservice.');
await consumer.run({
partitionsConsumedConcurrently: partitionsConsumedConcurrently,
eachMessage: async ({topic, partition, message}) => {
let headers = message.headers;
let key = message.key;
let msg = {};
msg.key = key.toString('utf8');
2020-04-30 15:17:39 +03:00
msg.data = message.value;
msg.headers = {data: headers};
messageProcessor.onJsInvokeMessage(msg);
},
});
} catch (e) {
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
logger.error(e.stack);
exit(-1);
}
})();
2020-09-02 18:12:08 +03:00
function createTopic(topic, partitions) {
return kafkaAdmin.createTopics({
topics: [{
topic: topic,
2020-09-02 18:12:08 +03:00
numPartitions: partitions,
replicationFactor: replicationFactor,
configEntries: configEntries
}]
});
}
function parseTopicProperties() {
const props = topicProperties.split(';');
props.forEach(p => {
const delimiterPosition = p.indexOf(':');
configEntries.push({name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1)});
});
}
process.on('exit', () => {
exit(0);
});
async function exit(status) {
logger.info('Exiting with status: %d ...', status);
if (kafkaAdmin) {
logger.info('Stopping Kafka Admin...');
await kafkaAdmin.disconnect();
logger.info('Kafka Admin stopped.');
}
if (consumer) {
logger.info('Stopping Kafka Consumer...');
let _consumer = consumer;
consumer = null;
try {
await _consumer.disconnect();
logger.info('Kafka Consumer stopped.');
await disconnectProducer();
process.exit(status);
} catch (e) {
logger.info('Kafka Consumer stop error.');
await disconnectProducer();
process.exit(status);
}
} else {
process.exit(status);
}
}
async function disconnectProducer() {
if (producer) {
logger.info('Stopping Kafka Producer...');
var _producer = producer;
producer = null;
try {
2021-05-27 14:55:10 +03:00
logger.info('Stopping loop...');
clearTimeout(sendLoopInstance);
await sendMessagesAsBatch();
await _producer.disconnect();
logger.info('Kafka Producer stopped.');
} catch (e) {
logger.info('Kafka Producer stop error.');
}
}
}