je-executors: expireTs implemented to drop any expired message and prevent congestion on js eval requests queue

This commit is contained in:
Sergey Matvienko 2022-07-06 19:07:10 +03:00
parent 04d962c18c
commit 4968d0101c
4 changed files with 28 additions and 10 deletions

View File

@ -22,7 +22,7 @@ import java.util.UUID;
public class AbstractTbQueueTemplate {
protected static final String REQUEST_ID_HEADER = "requestId";
protected static final String RESPONSE_TOPIC_HEADER = "responseTopic";
protected static final String REQUEST_TIME = "requestTime";
protected static final String EXPIRE_TS_HEADER = "expireTs";
protected byte[] uuidToBytes(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(16);

View File

@ -56,6 +56,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
final boolean internalExecutor;
final ExecutorService executor;
final long maxRequestTimeoutNs;
final long maxRequestTimeout;
final long maxPendingRequests;
final long pollInterval;
volatile boolean stopped = false;
@ -76,6 +77,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
this.requestTemplate = requestTemplate;
this.responseTemplate = responseTemplate;
this.maxRequestTimeoutNs = TimeUnit.MILLISECONDS.toNanos(maxRequestTimeout);
this.maxRequestTimeout = maxRequestTimeout;
this.maxPendingRequests = maxPendingRequests;
this.pollInterval = pollInterval;
this.internalExecutor = (executor == null);
@ -170,7 +172,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
log.error("[{}] Missing requestId in header and body", response);
} else {
requestId = bytesToUuid(requestIdHeader);
log.trace("[{}] Response received: {}", requestId, String.valueOf(response).replace("\n", " ")); //TODO remove overhead
log.trace("[{}] Response received: {}", requestId, response);
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.debug("[{}] Invalid or stale request, response: {}", requestId, String.valueOf(response).replace("\n", " "));
@ -216,7 +218,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
UUID requestId = UUID.randomUUID();
request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
request.getHeaders().put(REQUEST_TIME, longToBytes(getCurrentTimeMs()));
request.getHeaders().put(EXPIRE_TS_HEADER, longToBytes(getCurrentTimeMs() + maxRequestTimeout));
long currentClockNs = getCurrentClockNs();
SettableFuture<Response> future = SettableFuture.create();
ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(currentClockNs + requestTimeoutNs, future, currentClockNs, requestTimeoutNs);

View File

@ -97,8 +97,8 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
requests.forEach(request -> {
long currentTime = System.currentTimeMillis();
long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));
if (requestTime + requestTimeout >= currentTime) {
long expireTs = bytesToLong(request.getHeaders().get(EXPIRE_TS_HEADER));
if (expireTs >= currentTime) {
byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header", request);

View File

@ -21,6 +21,7 @@ const TIMEOUT_ERROR = 2;
const UNRECOGNIZED = -1;
const config = require('config'),
Long = require('long'),
logger = require('../config/logger')._logger('JsInvokeMessageProcessor'),
Utils = require('./utils'),
JsExecutor = require('./jsExecutor');
@ -49,6 +50,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) {
var tStart = performance.now();
let requestId;
let responseTopic;
let expireTs;
let headers;
let request;
let buf;
@ -59,6 +61,20 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) {
requestId = Utils.UUIDFromBuffer(buf);
buf = Buffer.from(headers.data['responseTopic']);
responseTopic = buf.toString('utf8');
buf = Buffer.from(headers.data['expireTs']);
expireTs = Long.fromBytes(buf, false, false).toNumber();
const now = Date.now();
// if (logger.isDebugEnabled()) {
// logger.debug('expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now)
// }
if (expireTs && expireTs <= now) {
if (logger.isDebugEnabled()) {
logger.debug('Message expired! expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now)
}
return;
}
logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic);
@ -140,7 +156,7 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, r
(result) => {
var invokeResponse = createInvokeResponse(result, true);
logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse);
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
},
(err) => {
var errorCode;
@ -151,14 +167,14 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, r
}
var invokeResponse = createInvokeResponse("", false, errorCode, err);
logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse);
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
}
)
},
(err) => {
var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err);
logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR);
this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse);
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
}
);
}
@ -176,7 +192,7 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function (requestId,
}
var releaseResponse = createReleaseResponse(scriptId, true);
logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId);
this.sendResponse(requestId, responseTopic, headers, scriptId, null, null, releaseResponse);
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, undefined, releaseResponse);
}
JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, headers, scriptId, compileResponse, invokeResponse, releaseResponse) {
@ -276,7 +292,7 @@ function createReleaseResponse(scriptId, success) {
function parseJsErrorDetails(err) {
if (!err) {
return '';
return undefined;
}
var details = err.name + ': ' + err.message;
if (err.stack) {