js-executor: code format
This commit is contained in:
parent
1d9fc4a322
commit
41391dbef8
@ -43,7 +43,7 @@ function JsInvokeMessageProcessor(producer) {
|
|||||||
this.lastStatTime = performance.now();
|
this.lastStatTime = performance.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
|
JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) {
|
||||||
var tStart = performance.now();
|
var tStart = performance.now();
|
||||||
let requestId;
|
let requestId;
|
||||||
let responseTopic;
|
let responseTopic;
|
||||||
@ -78,13 +78,13 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
|
|||||||
var tFinish = performance.now();
|
var tFinish = performance.now();
|
||||||
var tTook = tFinish - tStart;
|
var tTook = tFinish - tStart;
|
||||||
|
|
||||||
if ( tTook > slowQueryLogMs ) {
|
if (tTook > slowQueryLogMs) {
|
||||||
let functionName;
|
let functionName;
|
||||||
if (request.invokeRequest) {
|
if (request.invokeRequest) {
|
||||||
try {
|
try {
|
||||||
buf = Buffer.from(request.invokeRequest['functionName']);
|
buf = Buffer.from(request.invokeRequest['functionName']);
|
||||||
functionName = buf.toString('utf8');
|
functionName = buf.toString('utf8');
|
||||||
} catch (err){
|
} catch (err) {
|
||||||
logger.error('[%s] Failed to read functionName from message header: %s', requestId, err.message);
|
logger.error('[%s] Failed to read functionName from message header: %s', requestId, err.message);
|
||||||
logger.error(err.stack);
|
logger.error(err.stack);
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, headers, compileRequest) {
|
JsInvokeMessageProcessor.prototype.processCompileRequest = function (requestId, responseTopic, headers, compileRequest) {
|
||||||
var scriptId = getScriptId(compileRequest);
|
var scriptId = getScriptId(compileRequest);
|
||||||
logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
|
logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
|
||||||
|
|
||||||
@ -116,7 +116,7 @@ JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, r
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, headers, invokeRequest) {
|
JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, responseTopic, headers, invokeRequest) {
|
||||||
var scriptId = getScriptId(invokeRequest);
|
var scriptId = getScriptId(invokeRequest);
|
||||||
logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId);
|
logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId);
|
||||||
this.executedScriptsCounter++;
|
this.executedScriptsCounter++;
|
||||||
@ -160,7 +160,7 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, re
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, headers, releaseRequest) {
|
JsInvokeMessageProcessor.prototype.processReleaseRequest = function (requestId, responseTopic, headers, releaseRequest) {
|
||||||
var scriptId = getScriptId(releaseRequest);
|
var scriptId = getScriptId(releaseRequest);
|
||||||
logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId);
|
logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId);
|
||||||
if (this.scriptMap.has(scriptId)) {
|
if (this.scriptMap.has(scriptId)) {
|
||||||
@ -193,9 +193,9 @@ JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseT
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) {
|
JsInvokeMessageProcessor.prototype.getOrCompileScript = function (scriptId, scriptBody) {
|
||||||
var self = this;
|
var self = this;
|
||||||
return new Promise(function(resolve, reject) {
|
return new Promise(function (resolve, reject) {
|
||||||
if (self.scriptMap.has(scriptId)) {
|
if (self.scriptMap.has(scriptId)) {
|
||||||
resolve(self.scriptMap.get(scriptId));
|
resolve(self.scriptMap.get(scriptId));
|
||||||
} else {
|
} else {
|
||||||
@ -212,7 +212,7 @@ JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scrip
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
JsInvokeMessageProcessor.prototype.cacheScript = function(scriptId, script) {
|
JsInvokeMessageProcessor.prototype.cacheScript = function (scriptId, script) {
|
||||||
if (!this.scriptMap.has(scriptId)) {
|
if (!this.scriptMap.has(scriptId)) {
|
||||||
this.scriptIds.push(scriptId);
|
this.scriptIds.push(scriptId);
|
||||||
while (this.scriptIds.length > maxActiveScripts) {
|
while (this.scriptIds.length > maxActiveScripts) {
|
||||||
@ -229,40 +229,40 @@ JsInvokeMessageProcessor.prototype.cacheScript = function(scriptId, script) {
|
|||||||
function createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse) {
|
function createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse) {
|
||||||
const requestIdBits = Utils.UUIDToBits(requestId);
|
const requestIdBits = Utils.UUIDToBits(requestId);
|
||||||
return {
|
return {
|
||||||
requestIdMSB: requestIdBits[0],
|
requestIdMSB: requestIdBits[0],
|
||||||
requestIdLSB: requestIdBits[1],
|
requestIdLSB: requestIdBits[1],
|
||||||
compileResponse: compileResponse,
|
compileResponse: compileResponse,
|
||||||
invokeResponse: invokeResponse,
|
invokeResponse: invokeResponse,
|
||||||
releaseResponse: releaseResponse
|
releaseResponse: releaseResponse
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function createCompileResponse(scriptId, success, errorCode, err) {
|
function createCompileResponse(scriptId, success, errorCode, err) {
|
||||||
const scriptIdBits = Utils.UUIDToBits(scriptId);
|
const scriptIdBits = Utils.UUIDToBits(scriptId);
|
||||||
return {
|
return {
|
||||||
errorCode: errorCode,
|
errorCode: errorCode,
|
||||||
success: success,
|
success: success,
|
||||||
errorDetails: parseJsErrorDetails(err),
|
errorDetails: parseJsErrorDetails(err),
|
||||||
scriptIdMSB: scriptIdBits[0],
|
scriptIdMSB: scriptIdBits[0],
|
||||||
scriptIdLSB: scriptIdBits[1]
|
scriptIdLSB: scriptIdBits[1]
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function createInvokeResponse(result, success, errorCode, err) {
|
function createInvokeResponse(result, success, errorCode, err) {
|
||||||
return {
|
return {
|
||||||
errorCode: errorCode,
|
errorCode: errorCode,
|
||||||
success: success,
|
success: success,
|
||||||
errorDetails: parseJsErrorDetails(err),
|
errorDetails: parseJsErrorDetails(err),
|
||||||
result: result
|
result: result
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function createReleaseResponse(scriptId, success) {
|
function createReleaseResponse(scriptId, success) {
|
||||||
const scriptIdBits = Utils.UUIDToBits(scriptId);
|
const scriptIdBits = Utils.UUIDToBits(scriptId);
|
||||||
return {
|
return {
|
||||||
success: success,
|
success: success,
|
||||||
scriptIdMSB: scriptIdBits[0],
|
scriptIdMSB: scriptIdBits[0],
|
||||||
scriptIdLSB: scriptIdBits[1]
|
scriptIdLSB: scriptIdBits[1]
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -45,9 +45,9 @@ function KafkaProducer() {
|
|||||||
const message = {
|
const message = {
|
||||||
topic: responseTopic,
|
topic: responseTopic,
|
||||||
messages: [{
|
messages: [{
|
||||||
key: scriptId,
|
key: scriptId,
|
||||||
value: rawResponse,
|
value: rawResponse,
|
||||||
headers: headers.data
|
headers: headers.data
|
||||||
}]
|
}]
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -70,28 +70,28 @@ function pushMessageToSendLater(message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function sendLoopWithLinger() {
|
function sendLoopWithLinger() {
|
||||||
if (sendLoopInstance) {
|
if (sendLoopInstance) {
|
||||||
logger.debug("Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger);
|
logger.debug("Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger);
|
||||||
clearInterval(sendLoopInstance);
|
clearInterval(sendLoopInstance);
|
||||||
} else {
|
} else {
|
||||||
logger.debug("Starting new send loop with linger [%s]", linger)
|
logger.debug("Starting new send loop with linger [%s]", linger)
|
||||||
}
|
}
|
||||||
sendLoopInstance = setInterval(sendMessagesAsBatch, linger);
|
sendLoopInstance = setInterval(sendMessagesAsBatch, linger);
|
||||||
}
|
}
|
||||||
|
|
||||||
function sendMessagesAsBatch() {
|
function sendMessagesAsBatch() {
|
||||||
if (batchMessages.length > 0) {
|
if (batchMessages.length > 0) {
|
||||||
logger.debug('sendMessagesAsBatch, length: [%s]', batchMessages.length);
|
logger.debug('sendMessagesAsBatch, length: [%s]', batchMessages.length);
|
||||||
const messagesToSend = batchMessages;
|
const messagesToSend = batchMessages;
|
||||||
const resolvers = batchResolvers;
|
const resolvers = batchResolvers;
|
||||||
batchMessages = [];
|
batchMessages = [];
|
||||||
batchResolvers = [];
|
batchResolvers = [];
|
||||||
producer.sendBatch({
|
producer.sendBatch({
|
||||||
topicMessages: messagesToSend,
|
topicMessages: messagesToSend,
|
||||||
acks: acks,
|
acks: acks,
|
||||||
compression: compressionType
|
compression: compressionType
|
||||||
}).then(
|
}).then(
|
||||||
() => {
|
() => {
|
||||||
logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length);
|
logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length);
|
||||||
for (let i = 0; i < resolvers.length; i++) {
|
for (let i = 0; i < resolvers.length; i++) {
|
||||||
resolvers[i]();
|
resolvers[i]();
|
||||||
@ -103,8 +103,8 @@ function sendMessagesAsBatch() {
|
|||||||
batchMessages = messagesToSend.concat(batchMessages);
|
batchMessages = messagesToSend.concat(batchMessages);
|
||||||
batchResolvers = resolvers.concat(batchResolvers); //promises will never be rejected. Will retry forever
|
batchResolvers = resolvers.concat(batchResolvers); //promises will never be rejected. Will retry forever
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(async () => {
|
(async () => {
|
||||||
@ -120,8 +120,8 @@ function sendMessagesAsBatch() {
|
|||||||
|
|
||||||
let kafkaConfig = {
|
let kafkaConfig = {
|
||||||
brokers: kafkaBootstrapServers.split(','),
|
brokers: kafkaBootstrapServers.split(','),
|
||||||
logLevel: logLevel.INFO,
|
logLevel: logLevel.INFO,
|
||||||
logCreator: KafkaJsWinstonLogCreator
|
logCreator: KafkaJsWinstonLogCreator
|
||||||
};
|
};
|
||||||
|
|
||||||
if (kafkaClientId) {
|
if (kafkaClientId) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user