diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java index 1b193b7416..5204de42e7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java @@ -22,7 +22,7 @@ import java.util.UUID; public class AbstractTbQueueTemplate { protected static final String REQUEST_ID_HEADER = "requestId"; protected static final String RESPONSE_TOPIC_HEADER = "responseTopic"; - protected static final String REQUEST_TIME = "requestTime"; + protected static final String EXPIRE_TS_HEADER = "expireTs"; protected byte[] uuidToBytes(UUID uuid) { ByteBuffer buf = ByteBuffer.allocate(16); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index 9fbe9b1104..1ba1422162 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -56,6 +56,7 @@ public class DefaultTbQueueRequestTemplate expectedResponse = pendingRequests.remove(requestId); if (expectedResponse == null) { log.debug("[{}] Invalid or stale request, response: {}", requestId, String.valueOf(response).replace("\n", " ")); @@ -216,7 +218,7 @@ public class DefaultTbQueueRequestTemplate future = SettableFuture.create(); ResponseMetaData responseMetaData = new ResponseMetaData<>(currentClockNs + requestTimeoutNs, future, currentClockNs, requestTimeoutNs); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java index 4859b2953b..ed3c3b8cc5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java @@ -97,8 +97,8 @@ public class DefaultTbQueueResponseTemplate { long currentTime = System.currentTimeMillis(); - long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME)); - if (requestTime + requestTimeout >= currentTime) { + long expireTs = bytesToLong(request.getHeaders().get(EXPIRE_TS_HEADER)); + if (expireTs >= currentTime) { byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER); if (requestIdHeader == null) { log.error("[{}] Missing requestId in header", request); diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index 86f441a5b3..f3c68cb819 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -21,6 +21,7 @@ const TIMEOUT_ERROR = 2; const UNRECOGNIZED = -1; const config = require('config'), + Long = require('long'), logger = require('../config/logger')._logger('JsInvokeMessageProcessor'), Utils = require('./utils'), JsExecutor = require('./jsExecutor'); @@ -49,6 +50,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) { var tStart = performance.now(); let requestId; let responseTopic; + let expireTs; let headers; let request; let buf; @@ -59,6 +61,20 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) { requestId = Utils.UUIDFromBuffer(buf); buf = Buffer.from(headers.data['responseTopic']); responseTopic = buf.toString('utf8'); + buf = Buffer.from(headers.data['expireTs']); + expireTs = Long.fromBytes(buf, false, false).toNumber(); + + const now = Date.now(); + // if (logger.isDebugEnabled()) { + // logger.debug('expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now) + // } + + if (expireTs && expireTs <= now) { + if (logger.isDebugEnabled()) { + logger.debug('Message expired! expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now) + } + return; + } logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic); @@ -140,7 +156,7 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, r (result) => { var invokeResponse = createInvokeResponse(result, true); logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); }, (err) => { var errorCode; @@ -151,14 +167,14 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, r } var invokeResponse = createInvokeResponse("", false, errorCode, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } ) }, (err) => { var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } ); } @@ -176,7 +192,7 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function (requestId, } var releaseResponse = createReleaseResponse(scriptId, true); logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, headers, scriptId, null, null, releaseResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, undefined, releaseResponse); } JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, headers, scriptId, compileResponse, invokeResponse, releaseResponse) { @@ -276,7 +292,7 @@ function createReleaseResponse(scriptId, success) { function parseJsErrorDetails(err) { if (!err) { - return ''; + return undefined; } var details = err.name + ': ' + err.message; if (err.stack) { diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index 1d1ad75318..b9961be709 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -65,15 +65,15 @@ async function pushMessageToSendLater(message) { function sendLoopWithLinger() { if (sendLoopInstance) { clearTimeout(sendLoopInstance); - } else { - logger.debug("Starting new send loop with linger [%s]", linger) + // } else { + // logger.debug("Starting new send loop with linger [%s]", linger) } sendLoopInstance = setTimeout(sendMessagesAsBatch, linger); } async function sendMessagesAsBatch(isImmediately) { if (sendLoopInstance) { - logger.debug("sendMessagesAsBatch: Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger); + // logger.debug("sendMessagesAsBatch: Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger); clearTimeout(sendLoopInstance); } sendLoopInstance = null; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java index ea503fbb34..29c61b81c7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.transform; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -49,7 +50,7 @@ public abstract class TbAbstractTransformNode implements TbNode { withCallback(transform(ctx, msg), m -> transformSuccess(ctx, msg, m), t -> transformFailure(ctx, msg, t), - ctx.getDbCallbackExecutor()); + MoreExecutors.directExecutor()); } protected void transformFailure(TbContext ctx, TbMsg msg, Throwable t) {