js-executor: Refactoring Kafka executor for use async/await

This commit is contained in:
Vladyslav_Prykhodko 2021-06-09 13:30:28 +03:00
parent ff7fa6237f
commit dd74824570

View File

@ -36,11 +36,10 @@ let producer;
const configEntries = [];
let batchMessages = [];
let batchResolvers = [];
let sendLoopInstance;
function KafkaProducer() {
this.send = (responseTopic, scriptId, rawResponse, headers) => {
this.send = async (responseTopic, scriptId, rawResponse, headers) => {
logger.debug('Pending queue response, scriptId: [%s]', scriptId);
const message = {
topic: responseTopic,
@ -51,60 +50,50 @@ function KafkaProducer() {
}]
};
return pushMessageToSendLater(message);
await pushMessageToSendLater(message);
}
}
function pushMessageToSendLater(message) {
let resolver;
const promise = new Promise((resolve, reject) => {
resolver = resolve;
});
async function pushMessageToSendLater(message) {
batchMessages.push(message);
batchResolvers.push(resolver);
if (batchMessages.length >= maxBatchSize) {
sendMessagesAsBatch();
sendLoopWithLinger(); //reset loop function and reschedule new linger
await sendMessagesAsBatch(true);
}
return promise;
}
function sendLoopWithLinger() {
if (sendLoopInstance) {
logger.debug("Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger);
clearInterval(sendLoopInstance);
clearTimeout(sendLoopInstance);
} else {
logger.debug("Starting new send loop with linger [%s]", linger)
}
sendLoopInstance = setInterval(sendMessagesAsBatch, linger);
sendLoopInstance = setTimeout(sendMessagesAsBatch, linger);
}
function sendMessagesAsBatch() {
if (batchMessages.length > 0) {
logger.debug('sendMessagesAsBatch, length: [%s]', batchMessages.length);
const messagesToSend = batchMessages;
const resolvers = batchResolvers;
batchMessages = [];
batchResolvers = [];
producer.sendBatch({
topicMessages: messagesToSend,
acks: acks,
compression: compressionType
}).then(
() => {
logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length);
for (let i = 0; i < resolvers.length; i++) {
resolvers[i]();
}
},
(err) => {
logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length);
logger.error(err.stack);
batchMessages = messagesToSend.concat(batchMessages);
batchResolvers = resolvers.concat(batchResolvers); //promises will never be rejected. Will retry forever
}
);
async function sendMessagesAsBatch(isImmediately) {
if (sendLoopInstance) {
logger.debug("sendMessagesAsBatch: Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger);
clearTimeout(sendLoopInstance);
}
sendLoopInstance = null;
if (batchMessages.length > 0) {
logger.debug('sendMessagesAsBatch, length: [%s], %s', batchMessages.length, isImmediately ? 'immediately' : '');
const messagesToSend = batchMessages;
batchMessages = [];
try {
await producer.sendBatch({
topicMessages: messagesToSend,
acks: acks,
compression: compressionType
})
logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length);
} catch(err) {
logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length);
logger.error(err.stack);
batchMessages = messagesToSend.concat(batchMessages);
}
}
sendLoopWithLinger();
}
(async () => {
@ -285,9 +274,8 @@ async function disconnectProducer() {
producer = null;
try {
logger.info('Stopping loop...');
//TODO: send handle msg
clearInterval(sendLoopInstance);
sendMessagesAsBatch();
clearTimeout(sendLoopInstance);
await sendMessagesAsBatch();
await _producer.disconnect();
logger.info('Kafka Producer stopped.');
} catch (e) {