diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java index 1b193b7416..5204de42e7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java @@ -22,7 +22,7 @@ import java.util.UUID; public class AbstractTbQueueTemplate { protected static final String REQUEST_ID_HEADER = "requestId"; protected static final String RESPONSE_TOPIC_HEADER = "responseTopic"; - protected static final String REQUEST_TIME = "requestTime"; + protected static final String EXPIRE_TS_HEADER = "expireTs"; protected byte[] uuidToBytes(UUID uuid) { ByteBuffer buf = ByteBuffer.allocate(16); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index 9fbe9b1104..1ba1422162 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -56,6 +56,7 @@ public class DefaultTbQueueRequestTemplate expectedResponse = pendingRequests.remove(requestId); if (expectedResponse == null) { log.debug("[{}] Invalid or stale request, response: {}", requestId, String.valueOf(response).replace("\n", " ")); @@ -216,7 +218,7 @@ public class DefaultTbQueueRequestTemplate future = SettableFuture.create(); ResponseMetaData responseMetaData = new ResponseMetaData<>(currentClockNs + requestTimeoutNs, future, currentClockNs, requestTimeoutNs); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java index 4859b2953b..ed3c3b8cc5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java @@ -97,8 +97,8 @@ public class DefaultTbQueueResponseTemplate { long currentTime = System.currentTimeMillis(); - long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME)); - if (requestTime + requestTimeout >= currentTime) { + long expireTs = bytesToLong(request.getHeaders().get(EXPIRE_TS_HEADER)); + if (expireTs >= currentTime) { byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER); if (requestIdHeader == null) { log.error("[{}] Missing requestId in header", request); diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index 86f441a5b3..f3c68cb819 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -21,6 +21,7 @@ const TIMEOUT_ERROR = 2; const UNRECOGNIZED = -1; const config = require('config'), + Long = require('long'), logger = require('../config/logger')._logger('JsInvokeMessageProcessor'), Utils = require('./utils'), JsExecutor = require('./jsExecutor'); @@ -49,6 +50,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) { var tStart = performance.now(); let requestId; let responseTopic; + let expireTs; let headers; let request; let buf; @@ -59,6 +61,20 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) { requestId = Utils.UUIDFromBuffer(buf); buf = Buffer.from(headers.data['responseTopic']); responseTopic = buf.toString('utf8'); + buf = Buffer.from(headers.data['expireTs']); + expireTs = Long.fromBytes(buf, false, false).toNumber(); + + const now = Date.now(); + // if (logger.isDebugEnabled()) { + // logger.debug('expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now) + // } + + if (expireTs && expireTs <= now) { + if (logger.isDebugEnabled()) { + logger.debug('Message expired! expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now) + } + return; + } logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic); @@ -140,7 +156,7 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, r (result) => { var invokeResponse = createInvokeResponse(result, true); logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); }, (err) => { var errorCode; @@ -151,14 +167,14 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, r } var invokeResponse = createInvokeResponse("", false, errorCode, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } ) }, (err) => { var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } ); } @@ -176,7 +192,7 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function (requestId, } var releaseResponse = createReleaseResponse(scriptId, true); logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, null, releaseResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, undefined, releaseResponse); } JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, headers, scriptId, compileResponse, invokeResponse, releaseResponse) { @@ -276,7 +292,7 @@ function createReleaseResponse(scriptId, success) { function parseJsErrorDetails(err) { if (!err) { - return ''; + return undefined; } var details = err.name + ': ' + err.message; if (err.stack) {