From 0970ce65b448f622056c070831aa1625f1e81f36 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 27 May 2021 18:45:29 +0300 Subject: [PATCH] js-executor: maxBatchSize --- msa/js-executor/queue/kafkaTemplate.js | 88 ++++++++++++++++---------- 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index da46a45301..a061215b28 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -26,6 +26,9 @@ const acks = Number(config.get('kafka.acks')); const requestTimeout = Number(config.get('kafka.requestTimeout')); const compressionType = (config.get('kafka.requestTimeout') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None; +const linger = 5; //milliseconds //TODO move to the config +const maxBatchSize = 10; //max messages in batch //TODO move to the config + let kafkaClient; let kafkaAdmin; let consumer; @@ -33,8 +36,8 @@ let producer; const configEntries = []; -let topicMessages = []; -let loopSend; +let batchMessages = []; +let sendLoopInstance; function KafkaProducer() { this.send = (responseTopic, scriptId, rawResponse, headers) => { @@ -48,36 +51,51 @@ function KafkaProducer() { }] }; - topicMessages.push(message); + pushMessageToSendLater(message); return {}; } } -function sendLoopFunction() { - loopSend = setInterval(sendProducerMsg, 200); +function pushMessageToSendLater(message) { + batchMessages.push(message); + if (batchMessages.length >= maxBatchSize) { + sendMessagesAsBatch(); + sendLoopWithLinger(); //reset loop function and reschedule new linger + } } -function sendProducerMsg() { - if (topicMessages.length > 0) { - logger.info('sendProducerMsg from queue response, lenght: [%s]', topicMessages.length ); - const messagesToSend = topicMessages; - topicMessages = []; +function sendLoopWithLinger() { + if (sendLoopInstance) { + logger.debug("Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger); + clearInterval(sendLoopInstance); + } else { + logger.debug("Starting new send loop with linger [%s]", linger) + } + sendLoopInstance = setInterval(sendMessagesAsBatch, linger); +} + +function sendMessagesAsBatch() { + if (batchMessages.length > 0) { + logger.info('sendMessagesAsBatch, lenght: [%s]', batchMessages.length ); + const messagesToSend = batchMessages; + batchMessages = []; producer.sendBatch({ topicMessages: messagesToSend, acks: acks, compression: compressionType }).then( - () => { - logger.info('Response sent to kafka, length: [%s]', messagesToSend.length ); - }, - (err) => { - if (err) { - logger.error('Failed to send kafka, length: [%s], pending to reprocess msgs', messagesToSend.length ); - topicMessages = messagesToSend.concat(topicMessages); - logger.error(err.stack); - } - } - ); + () => { + logger.info('Response sent to kafka, length: [%s]', messagesToSend.length ); + }, + (err) => { + logger.error('Failed to send kafka, length: [%s], pending to reprocess msgs', messagesToSend.length ); + batchMessages = messagesToSend.concat(batchMessages); + logger.error(err.stack); + } + ); + + } else { + //logger.debug("nothing to send"); } } @@ -156,22 +174,22 @@ function sendProducerMsg() { const { REQUEST_QUEUE_SIZE } = producer.events; const removeListenerRQS = producer.on(REQUEST_QUEUE_SIZE, e => logger.info(`producer REQUEST_QUEUE_SIZE ${e.payload.broker} size ${e.queueSize}`)); - const removeListeners = {} - const { FETCH_START } = consumer.events; - removeListeners[FETCH_START] = consumer.on(FETCH_START, e => logger.info(`consumer FETCH_START`)); - const { FETCH } = consumer.events; - removeListeners[FETCH] = consumer.on(FETCH, e => logger.info(`consumer FETCH numberOfBatches ${e.payload.numberOfBatches} duration ${e.payload.duration}`)); - const { START_BATCH_PROCESS } = consumer.events; - removeListeners[START_BATCH_PROCESS] = consumer.on(START_BATCH_PROCESS, e => logger.info(`consumer START_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`)); - const { END_BATCH_PROCESS } = consumer.events; - removeListeners[END_BATCH_PROCESS] = consumer.on(END_BATCH_PROCESS, e => logger.info(`consumer END_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`)); - const { COMMIT_OFFSETS } = consumer.events; - removeListeners[COMMIT_OFFSETS] = consumer.on(COMMIT_OFFSETS, e => logger.info(`consumer COMMIT_OFFSETS topics ${e.payload.topics}`)); +// const removeListeners = {} +// const { FETCH_START } = consumer.events; +// removeListeners[FETCH_START] = consumer.on(FETCH_START, e => logger.info(`consumer FETCH_START`)); +// const { FETCH } = consumer.events; +// removeListeners[FETCH] = consumer.on(FETCH, e => logger.info(`consumer FETCH numberOfBatches ${e.payload.numberOfBatches} duration ${e.payload.duration}`)); +// const { START_BATCH_PROCESS } = consumer.events; +// removeListeners[START_BATCH_PROCESS] = consumer.on(START_BATCH_PROCESS, e => logger.info(`consumer START_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`)); +// const { END_BATCH_PROCESS } = consumer.events; +// removeListeners[END_BATCH_PROCESS] = consumer.on(END_BATCH_PROCESS, e => logger.info(`consumer END_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`)); +// const { COMMIT_OFFSETS } = consumer.events; +// removeListeners[COMMIT_OFFSETS] = consumer.on(COMMIT_OFFSETS, e => logger.info(`consumer COMMIT_OFFSETS topics ${e.payload.topics}`)); const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); await consumer.connect(); await producer.connect(); - sendLoopFunction(); + sendLoopWithLinger(); await consumer.subscribe({topic: requestTopic}); logger.info('Started ThingsBoard JavaScript Executor Microservice.'); @@ -254,8 +272,8 @@ async function disconnectProducer() { try { logger.info('Stopping loop...'); //TODO: send handle msg - clearInterval(loopSend); - sendProducerMsg(); + clearInterval(sendLoopInstance); + sendMessagesAsBatch(); await _producer.disconnect(); logger.info('Kafka Producer stopped.'); } catch (e) {