/// /// Copyright © 2016-2022 The Thingsboard Authors /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// import config from 'config'; import { _logger, KafkaJsWinstonLogCreator } from '../config/logger'; import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' import { IQueue } from './queue.models'; import { Admin, CompressionTypes, Consumer, Kafka, KafkaConfig, logLevel, Partitioners, Producer, TopicMessages } from 'kafkajs'; export class KafkaTemplate implements IQueue { private logger = _logger(`kafkaTemplate`); private replicationFactor = Number(config.get('kafka.replication_factor')); private topicProperties: string = config.get('kafka.topic_properties'); private kafkaClientId: string = config.get('kafka.client_id'); private acks = Number(config.get('kafka.acks')); private maxBatchSize = Number(config.get('kafka.batch_size')); private linger = Number(config.get('kafka.linger_ms')); private requestTimeout = Number(config.get('kafka.requestTimeout')); private compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None; private partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently')); private kafkaClient: Kafka; private kafkaAdmin: Admin; private consumer: Consumer; private producer: Producer; private configEntries: any[] = []; private batchMessages: TopicMessages[] = []; private sendLoopInstance: NodeJS.Timeout; constructor() { } async init(): Promise { try { const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); const requestTopic: string = config.get('request_topic'); const useConfluent = config.get('kafka.use_confluent_cloud'); this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); this.logger.info('Kafka Requests Topic: %s', requestTopic); let kafkaConfig: KafkaConfig = { brokers: kafkaBootstrapServers.split(','), logLevel: logLevel.INFO, logCreator: KafkaJsWinstonLogCreator }; if (this.kafkaClientId) { kafkaConfig['clientId'] = this.kafkaClientId; } else { this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); } kafkaConfig['requestTimeout'] = this.requestTimeout; if (useConfluent) { kafkaConfig['sasl'] = { mechanism: config.get('kafka.confluent.sasl.mechanism') as any, username: config.get('kafka.confluent.username'), password: config.get('kafka.confluent.password') }; kafkaConfig['ssl'] = true; } this.parseTopicProperties(); this.kafkaClient = new Kafka(kafkaConfig); this.kafkaAdmin = this.kafkaClient.admin(); await this.kafkaAdmin.connect(); let partitions = 1; for (let i = 0; i < this.configEntries.length; i++) { let param = this.configEntries[i]; if (param.name === 'partitions') { partitions = param.value; this.configEntries.splice(i, 1); break; } } let topics = await this.kafkaAdmin.listTopics(); if (!topics.includes(requestTopic)) { let createRequestTopicResult = await this.createTopic(requestTopic, partitions); if (createRequestTopicResult) { this.logger.info('Created new topic: %s', requestTopic); } } this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); const {CRASH} = this.consumer.events; this.consumer.on(CRASH, async (e) => { this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); if (!e.payload.restart) { this.logger.error('Going to exit due to not retryable error!'); await this.destroy(-1); } }); const messageProcessor = new JsInvokeMessageProcessor(this); await this.consumer.connect(); await this.producer.connect(); this.sendLoopWithLinger(); await this.consumer.subscribe({topic: requestTopic}); await this.consumer.run({ partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, eachMessage: async ({topic, partition, message}) => { let headers = message.headers; let key = message.key || new Buffer([]); let msg = { key: key.toString('utf8'), data: message.value, headers: { data: headers } }; messageProcessor.onJsInvokeMessage(msg); }, }); } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); await this.destroy(-1); } } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); const message = { topic: responseTopic, messages: [{ key: scriptId, value: rawResponse, headers: headers.data }] }; await this.pushMessageToSendLater(message); } private async pushMessageToSendLater(message: TopicMessages) { this.batchMessages.push(message); if (this.batchMessages.length >= this.maxBatchSize) { await this.sendMessagesAsBatch(true); } } private async sendMessagesAsBatch(isImmediately = false): Promise { if (this.sendLoopInstance) { clearTimeout(this.sendLoopInstance); } if (this.batchMessages.length > 0) { this.logger.debug('sendMessagesAsBatch, length: [%s], %s', this.batchMessages.length, isImmediately ? 'immediately' : ''); const messagesToSend = this.batchMessages; this.batchMessages = []; try { await this.producer.sendBatch({ topicMessages: messagesToSend, acks: this.acks, compression: this.compressionType }) this.logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length); } catch (err: any) { this.logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length); this.logger.error(err.stack); this.batchMessages = messagesToSend.concat(this.batchMessages); } } this.sendLoopWithLinger(); } private parseTopicProperties() { const props = this.topicProperties.split(';'); props.forEach(p => { const delimiterPosition = p.indexOf(':'); this.configEntries.push({ name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1) }); }); } private createTopic(topic: string, partitions: number): Promise { return this.kafkaAdmin.createTopics({ topics: [{ topic: topic, numPartitions: partitions, replicationFactor: this.replicationFactor, configEntries: this.configEntries }] }); } private sendLoopWithLinger() { if (this.sendLoopInstance) { clearTimeout(this.sendLoopInstance); // } else { // this.logger.debug("Starting new send loop with linger [%s]", this.linger) } this.sendLoopInstance = setTimeout(async () => { await this.sendMessagesAsBatch() }, this.linger); } static async build(): Promise { const queue = new KafkaTemplate(); await queue.init(); return queue; } async destroy(status: number): Promise { this.logger.info('Exiting with status: %d ...', status); this.logger.info('Stopping Kafka resources...'); if (this.kafkaAdmin) { this.logger.info('Stopping Kafka Admin...'); const _kafkaAdmin = this.kafkaAdmin; // @ts-ignore delete this.kafkaAdmin; await _kafkaAdmin.disconnect(); this.logger.info('Kafka Admin stopped.'); } if (this.consumer) { this.logger.info('Stopping Kafka Consumer...'); try { const _consumer = this.consumer; // @ts-ignore delete this.consumer; await _consumer.disconnect(); this.logger.info('Kafka Consumer stopped.'); await this.disconnectProducer(); } catch (e: any) { this.logger.info('Kafka Consumer stop error.'); await this.disconnectProducer(); } } this.logger.info('Kafka resources stopped.'); process.exit(status); } private async disconnectProducer(): Promise { if (this.producer) { this.logger.info('Stopping Kafka Producer...'); try { this.logger.info('Stopping loop...'); clearTimeout(this.sendLoopInstance); await this.sendMessagesAsBatch(); const _producer = this.producer; // @ts-ignore delete this.producer; await _producer.disconnect(); this.logger.info('Kafka Producer stopped.'); } catch (e) { this.logger.info('Kafka Producer stop error.'); } } } }