thingsboard/msa/js-executor/api/jsInvokeMessageProcessor.js

263 lines
9.9 KiB
JavaScript
Raw Normal View History

2018-10-02 11:50:55 +03:00
/*
* Copyright © 2016-2019 The Thingsboard Authors
2018-10-02 11:50:55 +03:00
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
2019-12-03 14:45:59 +02:00
const COMPILATION_ERROR = 0;
const RUNTIME_ERROR = 1;
const TIMEOUT_ERROR = 2;
const UNRECOGNIZED = -1;
const config = require('config'),
2019-12-03 14:45:59 +02:00
logger = require('../config/logger')._logger('JsInvokeMessageProcessor'),
2018-10-02 11:50:55 +03:00
Utils = require('./utils'),
JsExecutor = require('./jsExecutor');
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency'));
2019-12-03 14:45:59 +02:00
const useSandbox = config.get('script.use_sandbox') === 'true';
const maxActiveScripts = Number(config.get('script.max_active_scripts'));
2018-10-02 11:50:55 +03:00
function JsInvokeMessageProcessor(producer) {
this.producer = producer;
2019-12-03 14:45:59 +02:00
this.executor = new JsExecutor(useSandbox);
2018-10-02 11:50:55 +03:00
this.scriptMap = {};
this.scriptIds = [];
this.executedScriptsCounter = 0;
2018-10-02 11:50:55 +03:00
}
JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
var requestId;
2019-12-03 14:45:59 +02:00
var responseTopic;
2018-10-02 11:50:55 +03:00
try {
2019-12-03 14:45:59 +02:00
var request = JSON.parse(message.value.toString('utf8'));
var buf = message.headers['requestId'];
requestId = Utils.UUIDFromBuffer(buf);
buf = message.headers['responseTopic'];
responseTopic = buf.toString('utf8');
2018-10-02 11:50:55 +03:00
2019-12-03 14:45:59 +02:00
logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic);
2018-10-02 11:50:55 +03:00
if (request.compileRequest) {
2019-12-03 14:45:59 +02:00
this.processCompileRequest(requestId, responseTopic, request.compileRequest);
2018-10-02 11:50:55 +03:00
} else if (request.invokeRequest) {
2019-12-03 14:45:59 +02:00
this.processInvokeRequest(requestId, responseTopic, request.invokeRequest);
2018-10-02 11:50:55 +03:00
} else if (request.releaseRequest) {
2019-12-03 14:45:59 +02:00
this.processReleaseRequest(requestId, responseTopic, request.releaseRequest);
2018-10-02 11:50:55 +03:00
} else {
logger.error('[%s] Unknown request recevied!', requestId);
}
} catch (err) {
logger.error('[%s] Failed to process request: %s', requestId, err.message);
logger.error(err.stack);
}
}
JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, compileRequest) {
var scriptId = getScriptId(compileRequest);
logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
this.executor.compileScript(compileRequest.scriptBody).then(
(script) => {
this.cacheScript(scriptId, script);
2018-10-02 11:50:55 +03:00
var compileResponse = createCompileResponse(scriptId, true);
logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId);
this.sendResponse(requestId, responseTopic, scriptId, compileResponse);
},
(err) => {
2019-12-03 14:45:59 +02:00
var compileResponse = createCompileResponse(scriptId, false, COMPILATION_ERROR, err);
2018-10-02 11:50:55 +03:00
logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId);
this.sendResponse(requestId, responseTopic, scriptId, compileResponse);
}
);
}
JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, invokeRequest) {
var scriptId = getScriptId(invokeRequest);
logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId);
this.executedScriptsCounter++;
if ( this.executedScriptsCounter >= scriptBodyTraceFrequency ) {
this.executedScriptsCounter = 0;
if (logger.levels[logger.level] >= logger.levels['debug']) {
logger.debug('[%s] Executing script body: [%s]', scriptId, invokeRequest.scriptBody);
}
}
2018-10-02 11:50:55 +03:00
this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then(
(script) => {
this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then(
(result) => {
var invokeResponse = createInvokeResponse(result, true);
logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
},
(err) => {
var errorCode;
if (err.message.includes('Script execution timed out')) {
2019-12-03 14:45:59 +02:00
errorCode = TIMEOUT_ERROR;
2018-10-02 11:50:55 +03:00
} else {
2019-12-03 14:45:59 +02:00
errorCode = RUNTIME_ERROR;
2018-10-02 11:50:55 +03:00
}
var invokeResponse = createInvokeResponse("", false, errorCode, err);
logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
}
)
},
(err) => {
2019-12-03 14:45:59 +02:00
var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err);
logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR);
2018-10-02 11:50:55 +03:00
this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
}
);
}
JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, releaseRequest) {
var scriptId = getScriptId(releaseRequest);
logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId);
if (this.scriptMap[scriptId]) {
var index = this.scriptIds.indexOf(scriptId);
if (index > -1) {
this.scriptIds.splice(index, 1);
}
2018-10-02 11:50:55 +03:00
delete this.scriptMap[scriptId];
}
var releaseResponse = createReleaseResponse(scriptId, true);
logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId);
this.sendResponse(requestId, responseTopic, scriptId, null, null, releaseResponse);
}
JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) {
var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
2019-12-03 14:45:59 +02:00
var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
this.producer.send(
{
topic: responseTopic,
messages: [
{
key: scriptId,
value: rawResponse
}
]
2018-10-02 11:50:55 +03:00
}
2019-12-03 14:45:59 +02:00
).then(
() => {},
(err) => {
if (err) {
logger.error('[%s] Failed to send response to kafka: %s', requestId, err.message);
logger.error(err.stack);
}
}
);
2018-10-02 11:50:55 +03:00
}
JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) {
var self = this;
return new Promise(function(resolve, reject) {
if (self.scriptMap[scriptId]) {
resolve(self.scriptMap[scriptId]);
} else {
self.executor.compileScript(scriptBody).then(
(script) => {
self.cacheScript(scriptId, script);
2018-10-02 11:50:55 +03:00
resolve(script);
},
(err) => {
reject(err);
}
);
}
});
}
JsInvokeMessageProcessor.prototype.cacheScript = function(scriptId, script) {
if (!this.scriptMap[scriptId]) {
this.scriptIds.push(scriptId);
while (this.scriptIds.length > maxActiveScripts) {
logger.info('Active scripts count [%s] exceeds maximum limit [%s]', this.scriptIds.length, maxActiveScripts);
const prevScriptId = this.scriptIds.shift();
logger.info('Removing active script with id [%s]', prevScriptId);
delete this.scriptMap[prevScriptId];
}
}
this.scriptMap[scriptId] = script;
}
2018-10-02 11:50:55 +03:00
function createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse) {
const requestIdBits = Utils.UUIDToBits(requestId);
2019-12-03 14:45:59 +02:00
return {
2018-10-02 11:50:55 +03:00
requestIdMSB: requestIdBits[0],
requestIdLSB: requestIdBits[1],
compileResponse: compileResponse,
invokeResponse: invokeResponse,
releaseResponse: releaseResponse
2019-12-03 14:45:59 +02:00
};
2018-10-02 11:50:55 +03:00
}
function createCompileResponse(scriptId, success, errorCode, err) {
const scriptIdBits = Utils.UUIDToBits(scriptId);
2019-12-03 14:45:59 +02:00
return {
2018-10-02 11:50:55 +03:00
errorCode: errorCode,
success: success,
errorDetails: parseJsErrorDetails(err),
scriptIdMSB: scriptIdBits[0],
scriptIdLSB: scriptIdBits[1]
2019-12-03 14:45:59 +02:00
};
2018-10-02 11:50:55 +03:00
}
function createInvokeResponse(result, success, errorCode, err) {
2019-12-03 14:45:59 +02:00
return {
2018-10-02 11:50:55 +03:00
errorCode: errorCode,
success: success,
errorDetails: parseJsErrorDetails(err),
result: result
2019-12-03 14:45:59 +02:00
};
2018-10-02 11:50:55 +03:00
}
function createReleaseResponse(scriptId, success) {
const scriptIdBits = Utils.UUIDToBits(scriptId);
2019-12-03 14:45:59 +02:00
return {
2018-10-02 11:50:55 +03:00
success: success,
scriptIdMSB: scriptIdBits[0],
scriptIdLSB: scriptIdBits[1]
2019-12-03 14:45:59 +02:00
};
2018-10-02 11:50:55 +03:00
}
function parseJsErrorDetails(err) {
if (!err) {
return '';
}
var details = err.name + ': ' + err.message;
if (err.stack) {
var lines = err.stack.split('\n');
if (lines && lines.length) {
var line = lines[0];
var splitted = line.split(':');
if (splitted && splitted.length === 2) {
if (!isNaN(splitted[1])) {
details += ' in at line number ' + splitted[1];
}
}
}
}
return details;
}
function getScriptId(request) {
return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB);
}
2019-12-03 14:45:59 +02:00
module.exports = JsInvokeMessageProcessor;