diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index a760e21e47..867dd82b39 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -172,17 +172,20 @@ JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseT var tStartSending = performance.now(); var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); - this.producer.send(responseTopic, scriptId, rawResponse, headers).then( - () => { - logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); - }, - (err) => { - if (err) { - logger.error('[%s] Failed to send response to queue: %s', requestId, err.message); - logger.error(err.stack); - } - } - ); + logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); + this.producer.send(responseTopic, scriptId, rawResponse, headers); +//TODO put error msg for other queues implementation except Kafka +// .then( +// () => { +// logger.info('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); +// }, +// (err) => { +// if (err) { +// logger.error('[%s] Failed to send response to queue: %s', requestId, err.message); +// logger.error(err.stack); +// } +// } +// ); } JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) { diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index 1fd21185d0..da46a45301 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -33,24 +33,54 @@ let producer; const configEntries = []; +let topicMessages = []; +let loopSend; + function KafkaProducer() { - this.send = async (responseTopic, scriptId, rawResponse, headers) => { - return producer.send( - { - topic: responseTopic, - acks: acks, - compression: compressionType, - messages: [ - { - key: scriptId, - value: rawResponse, - headers: headers.data - } - ] - }); + this.send = (responseTopic, scriptId, rawResponse, headers) => { + logger.debug('Pending queue response, scriptId: [%s]', scriptId); + const message = { + topic: responseTopic, + messages: [{ + key: scriptId, + value: rawResponse, + headers: headers.data + }] + }; + + topicMessages.push(message); + return {}; } } +function sendLoopFunction() { + loopSend = setInterval(sendProducerMsg, 200); +} + +function sendProducerMsg() { + if (topicMessages.length > 0) { + logger.info('sendProducerMsg from queue response, lenght: [%s]', topicMessages.length ); + const messagesToSend = topicMessages; + topicMessages = []; + producer.sendBatch({ + topicMessages: messagesToSend, + acks: acks, + compression: compressionType + }).then( + () => { + logger.info('Response sent to kafka, length: [%s]', messagesToSend.length ); + }, + (err) => { + if (err) { + logger.error('Failed to send kafka, length: [%s], pending to reprocess msgs', messagesToSend.length ); + topicMessages = messagesToSend.concat(topicMessages); + logger.error(err.stack); + } + } + ); + } +} + (async () => { try { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); @@ -141,6 +171,7 @@ function KafkaProducer() { const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); await consumer.connect(); await producer.connect(); + sendLoopFunction(); await consumer.subscribe({topic: requestTopic}); logger.info('Started ThingsBoard JavaScript Executor Microservice.'); @@ -221,6 +252,10 @@ async function disconnectProducer() { var _producer = producer; producer = null; try { + logger.info('Stopping loop...'); + //TODO: send handle msg + clearInterval(loopSend); + sendProducerMsg(); await _producer.disconnect(); logger.info('Kafka Producer stopped.'); } catch (e) {