diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index 7fce429d23..b34727c977 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -36,11 +36,10 @@ let producer; const configEntries = []; let batchMessages = []; -let batchResolvers = []; let sendLoopInstance; function KafkaProducer() { - this.send = (responseTopic, scriptId, rawResponse, headers) => { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { logger.debug('Pending queue response, scriptId: [%s]', scriptId); const message = { topic: responseTopic, @@ -51,60 +50,50 @@ function KafkaProducer() { }] }; - return pushMessageToSendLater(message); + await pushMessageToSendLater(message); } } -function pushMessageToSendLater(message) { - let resolver; - const promise = new Promise((resolve, reject) => { - resolver = resolve; - }); +async function pushMessageToSendLater(message) { batchMessages.push(message); - batchResolvers.push(resolver); if (batchMessages.length >= maxBatchSize) { - sendMessagesAsBatch(); - sendLoopWithLinger(); //reset loop function and reschedule new linger + await sendMessagesAsBatch(true); } - return promise; } function sendLoopWithLinger() { if (sendLoopInstance) { - logger.debug("Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger); - clearInterval(sendLoopInstance); + clearTimeout(sendLoopInstance); } else { logger.debug("Starting new send loop with linger [%s]", linger) } - sendLoopInstance = setInterval(sendMessagesAsBatch, linger); + sendLoopInstance = setTimeout(sendMessagesAsBatch, linger); } -function sendMessagesAsBatch() { - if (batchMessages.length > 0) { - logger.debug('sendMessagesAsBatch, length: [%s]', batchMessages.length); - const messagesToSend = batchMessages; - const resolvers = batchResolvers; - batchMessages = []; - batchResolvers = []; - producer.sendBatch({ - topicMessages: messagesToSend, - acks: acks, - compression: compressionType - }).then( - () => { - logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length); - for (let i = 0; i < resolvers.length; i++) { - resolvers[i](); - } - }, - (err) => { - logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length); - logger.error(err.stack); - batchMessages = messagesToSend.concat(batchMessages); - batchResolvers = resolvers.concat(batchResolvers); //promises will never be rejected. Will retry forever - } - ); +async function sendMessagesAsBatch(isImmediately) { + if (sendLoopInstance) { + logger.debug("sendMessagesAsBatch: Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger); + clearTimeout(sendLoopInstance); } + sendLoopInstance = null; + if (batchMessages.length > 0) { + logger.debug('sendMessagesAsBatch, length: [%s], %s', batchMessages.length, isImmediately ? 'immediately' : ''); + const messagesToSend = batchMessages; + batchMessages = []; + try { + await producer.sendBatch({ + topicMessages: messagesToSend, + acks: acks, + compression: compressionType + }) + logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length); + } catch(err) { + logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length); + logger.error(err.stack); + batchMessages = messagesToSend.concat(batchMessages); + } + } + sendLoopWithLinger(); } (async () => { @@ -285,9 +274,8 @@ async function disconnectProducer() { producer = null; try { logger.info('Stopping loop...'); - //TODO: send handle msg - clearInterval(sendLoopInstance); - sendMessagesAsBatch(); + clearTimeout(sendLoopInstance); + await sendMessagesAsBatch(); await _producer.disconnect(); logger.info('Kafka Producer stopped.'); } catch (e) {