js-executor: send messages as batch

This commit is contained in:
Sergey Matvienko 2021-05-27 14:55:10 +03:00
parent d729d9ee95
commit 35e2ff99c3
2 changed files with 63 additions and 25 deletions

View File

@ -172,17 +172,20 @@ JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseT
var tStartSending = performance.now(); var tStartSending = performance.now();
var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
this.producer.send(responseTopic, scriptId, rawResponse, headers).then( logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId);
() => { this.producer.send(responseTopic, scriptId, rawResponse, headers);
logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); //TODO put error msg for other queues implementation except Kafka
}, // .then(
(err) => { // () => {
if (err) { // logger.info('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);
logger.error('[%s] Failed to send response to queue: %s', requestId, err.message); // },
logger.error(err.stack); // (err) => {
} // if (err) {
} // logger.error('[%s] Failed to send response to queue: %s', requestId, err.message);
); // logger.error(err.stack);
// }
// }
// );
} }
JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) { JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) {

View File

@ -33,21 +33,51 @@ let producer;
const configEntries = []; const configEntries = [];
let topicMessages = [];
let loopSend;
function KafkaProducer() { function KafkaProducer() {
this.send = async (responseTopic, scriptId, rawResponse, headers) => { this.send = (responseTopic, scriptId, rawResponse, headers) => {
return producer.send( logger.debug('Pending queue response, scriptId: [%s]', scriptId);
{ const message = {
topic: responseTopic, topic: responseTopic,
acks: acks, messages: [{
compression: compressionType,
messages: [
{
key: scriptId, key: scriptId,
value: rawResponse, value: rawResponse,
headers: headers.data headers: headers.data
}]
};
topicMessages.push(message);
return {};
} }
] }
});
function sendLoopFunction() {
loopSend = setInterval(sendProducerMsg, 200);
}
function sendProducerMsg() {
if (topicMessages.length > 0) {
logger.info('sendProducerMsg from queue response, lenght: [%s]', topicMessages.length );
const messagesToSend = topicMessages;
topicMessages = [];
producer.sendBatch({
topicMessages: messagesToSend,
acks: acks,
compression: compressionType
}).then(
() => {
logger.info('Response sent to kafka, length: [%s]', messagesToSend.length );
},
(err) => {
if (err) {
logger.error('Failed to send kafka, length: [%s], pending to reprocess msgs', messagesToSend.length );
topicMessages = messagesToSend.concat(topicMessages);
logger.error(err.stack);
}
}
);
} }
} }
@ -141,6 +171,7 @@ function KafkaProducer() {
const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer());
await consumer.connect(); await consumer.connect();
await producer.connect(); await producer.connect();
sendLoopFunction();
await consumer.subscribe({topic: requestTopic}); await consumer.subscribe({topic: requestTopic});
logger.info('Started ThingsBoard JavaScript Executor Microservice.'); logger.info('Started ThingsBoard JavaScript Executor Microservice.');
@ -221,6 +252,10 @@ async function disconnectProducer() {
var _producer = producer; var _producer = producer;
producer = null; producer = null;
try { try {
logger.info('Stopping loop...');
//TODO: send handle msg
clearInterval(loopSend);
sendProducerMsg();
await _producer.disconnect(); await _producer.disconnect();
logger.info('Kafka Producer stopped.'); logger.info('Kafka Producer stopped.');
} catch (e) { } catch (e) {