diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java index 0af1f4846b..cc46f0a74f 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java @@ -15,12 +15,14 @@ */ package org.thingsboard.server.service.script; +import com.google.common.hash.Hashing; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.util.Pair; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.id.CustomerId; @@ -41,13 +43,15 @@ import static java.lang.String.format; * Created by ashvayka on 26.09.18. */ @Slf4j +@SuppressWarnings("UnstableApiUsage") public abstract class AbstractJsInvokeService implements JsInvokeService { private final TbApiUsageStateService apiUsageStateService; private final TbApiUsageClient apiUsageClient; protected ScheduledExecutorService timeoutExecutorService; - protected Map scriptIdToNameMap = new ConcurrentHashMap<>(); - protected Map disabledFunctions = new ConcurrentHashMap<>(); + + protected final Map> scriptIdToNameAndHashMap = new ConcurrentHashMap<>(); + protected final Map disabledFunctions = new ConcurrentHashMap<>(); @Getter @Value("${js.max_total_args_size:100000}") @@ -83,27 +87,34 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { return error(format("Script body exceeds maximum allowed size of %s symbols", getMaxScriptBodySize())); } UUID scriptId = UUID.randomUUID(); - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); + String scriptHash = hash(tenantId, scriptBody); + String functionName = constructFunctionName(scriptId, scriptHash); String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); - return doEval(scriptId, functionName, jsScript); + return doEval(scriptId, scriptHash, functionName, jsScript); } else { return error("JS Execution is disabled due to API limits!"); } } + protected String constructFunctionName(UUID scriptId, String scriptHash) { + return "invokeInternal_" + scriptId.toString().replace('-', '_'); + } + @Override public ListenableFuture invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) { if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) { - String functionName = scriptIdToNameMap.get(scriptId); - if (functionName == null) { + Pair nameAndHash = scriptIdToNameAndHashMap.get(scriptId); + if (nameAndHash == null) { return error("No compiled script found for scriptId: [" + scriptId + "]!"); } + String functionName = nameAndHash.getFirst(); + String scriptHash = nameAndHash.getSecond(); if (!isDisabled(scriptId)) { if (argsSizeExceeded(args)) { return scriptExecutionError(scriptId, format("Script input arguments exceed maximum allowed total args size of %s symbols", getMaxTotalArgsSize())); } apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1); - return Futures.transformAsync(doInvokeFunction(scriptId, functionName, args), output -> { + return Futures.transformAsync(doInvokeFunction(scriptId, scriptHash, functionName, args), output -> { String result = output.toString(); if (resultSizeExceeded(result)) { return scriptExecutionError(scriptId, format("Script invocation result exceeds maximum allowed size of %s symbols", getMaxResultSize())); @@ -123,12 +134,12 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { @Override public ListenableFuture release(UUID scriptId) { - String functionName = scriptIdToNameMap.get(scriptId); - if (functionName != null) { + Pair nameAndHash = scriptIdToNameAndHashMap.get(scriptId); + if (nameAndHash != null) { try { - scriptIdToNameMap.remove(scriptId); + scriptIdToNameAndHashMap.remove(scriptId); disabledFunctions.remove(scriptId); - doRelease(scriptId, functionName); + doRelease(scriptId, nameAndHash.getSecond(), nameAndHash.getFirst()); } catch (Exception e) { return Futures.immediateFailedFuture(e); } @@ -136,16 +147,24 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { return Futures.immediateFuture(null); } - protected abstract ListenableFuture doEval(UUID scriptId, String functionName, String scriptBody); + protected abstract ListenableFuture doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody); - protected abstract ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args); + protected abstract ListenableFuture doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args); - protected abstract void doRelease(UUID scriptId, String functionName) throws Exception; + protected abstract void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception; protected abstract int getMaxErrors(); protected abstract long getMaxBlacklistDuration(); + protected String hash(TenantId tenantId, String scriptBody) { + return Hashing.murmur3_128().newHasher() + .putLong(tenantId.getId().getMostSignificantBits()) + .putLong(tenantId.getId().getLeastSignificantBits()) + .putUnencodedChars(scriptBody) + .hash().toString(); + } + protected void onScriptExecutionError(UUID scriptId, Throwable t, String scriptBody) { DisableListInfo disableListInfo = disabledFunctions.computeIfAbsent(scriptId, key -> new DisableListInfo()); log.warn("Script has exception and will increment counter {} on disabledFunctions for id {}, exception {}, cause {}, scriptBody {}", diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java index d731ef1536..4b19bf9020 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java @@ -24,6 +24,7 @@ import delight.nashornsandbox.NashornSandboxes; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.util.Pair; import org.springframework.scheduling.annotation.Scheduled; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; @@ -38,7 +39,6 @@ import javax.script.ScriptException; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -121,7 +121,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer protected abstract long getMaxCpuTime(); @Override - protected ListenableFuture doEval(UUID scriptId, String functionName, String jsScript) { + protected ListenableFuture doEval(UUID scriptId, String scriptHash, String functionName, String jsScript) { jsPushedMsgs.incrementAndGet(); ListenableFuture result = jsExecutor.executeAsync(() -> { try { @@ -135,7 +135,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer } finally { evalLock.unlock(); } - scriptIdToNameMap.put(scriptId, functionName); + scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash)); return scriptId; } catch (Exception e) { log.debug("Failed to compile JS script: {}", e.getMessage(), e); @@ -150,7 +150,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer } @Override - protected ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args) { + protected ListenableFuture doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) { jsPushedMsgs.incrementAndGet(); ListenableFuture result = jsExecutor.executeAsync(() -> { try { @@ -174,7 +174,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer return result; } - protected void doRelease(UUID scriptId, String functionName) throws ScriptException { + protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws ScriptException { if (useJsSandbox()) { sandbox.eval(functionName + " = undefined;"); } else { diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 86b364afd4..328fc5ebb4 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -18,16 +18,19 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.data.util.Pair; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.StopWatch; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.JsInvokeErrorCode; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -45,6 +48,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')") @@ -98,9 +103,10 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } @Autowired - private TbQueueRequestTemplate, TbProtoQueueMsg> requestTemplate; + protected TbQueueRequestTemplate, TbProtoQueueMsg> requestTemplate; - private Map scriptIdToBodysMap = new ConcurrentHashMap<>(); + protected final Map scriptHashToBodysMap = new ConcurrentHashMap<>(); + private final Lock scriptsLock = new ReentrantLock(); @PostConstruct public void init() { @@ -117,10 +123,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } @Override - protected ListenableFuture doEval(UUID scriptId, String functionName, String scriptBody) { + protected ListenableFuture doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody) { JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) + .setScriptHash(scriptHash) .setFunctionName(functionName) .setScriptBody(scriptBody).build(); @@ -128,16 +133,100 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setCompileRequest(jsRequest) .build(); - log.trace("Post compile request for scriptId [{}]", scriptId); - ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); - if (maxEvalRequestsTimeout > 0) { - future = Futures.withTimeout(future, maxEvalRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); + log.trace("Post compile request for scriptId [{}] (hash: {})", scriptId, scriptHash); + ListenableFuture> future = sendJsRequest(UUID.randomUUID(), jsRequestWrapper, maxEvalRequestsTimeout, queueEvalMsgs); + return Futures.transform(future, response -> { + JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse(); + if (compilationResult.getSuccess()) { + scriptsLock.lock(); + try { + scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash)); + scriptHashToBodysMap.put(scriptHash, scriptBody); + } finally { + scriptsLock.unlock(); + } + return scriptId; + } else { + log.debug("[{}] (hash: {}) Failed to compile script due to [{}]: {}", scriptId, compilationResult.getScriptHash(), + compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); + throw new RuntimeException(compilationResult.getErrorDetails()); + } + }, callbackExecutor); + } + + @Override + protected String constructFunctionName(UUID scriptId, String scriptHash) { + return "invokeInternal_" + scriptHash; + } + + @Override + protected ListenableFuture doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) { + log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptHash, maxRequestsTimeout); + String scriptBody = scriptHashToBodysMap.get(scriptHash); + if (scriptBody == null) { + return Futures.immediateFailedFuture(new RuntimeException("No script body found for script hash [" + scriptHash + "] (script id: [" + scriptId + "])")); + } + JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(scriptHash, functionName, args, false, null); + + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + UUID requestKey = UUID.randomUUID(); + ListenableFuture> future = sendJsRequest(requestKey, jsRequestWrapper, maxRequestsTimeout, queueInvokeMsgs); + return Futures.transformAsync(future, response -> { + stopWatch.stop(); + log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey()); + JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse(); + if (invokeResult.getSuccess()) { + return Futures.immediateFuture(invokeResult.getResult()); + } else { + return handleInvokeError(requestKey, scriptId, scriptHash, invokeResult.getErrorCode(), + invokeResult.getErrorDetails(), functionName, args, scriptBody); + } + }, callbackExecutor); + } + + private ListenableFuture handleInvokeError(UUID requestKey, UUID scriptId, String scriptHash, + JsInvokeErrorCode errorCode, String errorDetails, + String functionName, Object[] args, String scriptBody) { + log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails); + RuntimeException e = new RuntimeException(errorDetails); + if (JsInvokeErrorCode.TIMEOUT_ERROR.equals(errorCode)) { + onScriptExecutionError(scriptId, e, scriptBody); + queueTimeoutMsgs.incrementAndGet(); + } else if (JsInvokeErrorCode.COMPILATION_ERROR.equals(errorCode)) { + onScriptExecutionError(scriptId, e, scriptBody); + } else if (JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) { + log.debug("[{}] Remote JS executor couldn't find the script", scriptId); + if (scriptBody != null) { + JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(scriptHash, functionName, args, true, scriptBody); + log.debug("[{}] Sending invoke request again with script body", scriptId); + return Futures.transformAsync(sendJsRequest(requestKey, invokeRequestWithScriptBody, maxRequestsTimeout, queueInvokeMsgs), r -> { + JsInvokeProtos.JsInvokeResponse result = r.getValue().getInvokeResponse(); + if (result.getSuccess()) { + return Futures.immediateFuture(result.getResult()); + } else { + return handleInvokeError(requestKey, scriptId, scriptHash, result.getErrorCode(), + result.getErrorDetails(), functionName, args, null); + } + }, MoreExecutors.directExecutor()); + } + } + queueFailedMsgs.incrementAndGet(); + return Futures.immediateFailedFuture(e); + } + + private ListenableFuture> sendJsRequest(UUID requestKey, JsInvokeProtos.RemoteJsRequest jsRequestWrapper, + long maxRequestsTimeout, AtomicInteger msgsCounter) { + ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, jsRequestWrapper)); + if (maxRequestsTimeout > 0) { + future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); } queuePushedMsgs.incrementAndGet(); - Futures.addCallback(future, new FutureCallback>() { + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable TbProtoQueueMsg result) { - queueEvalMsgs.incrementAndGet(); + msgsCounter.incrementAndGet(); } @Override @@ -148,34 +237,15 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { queueFailedMsgs.incrementAndGet(); } }, callbackExecutor); - return Futures.transform(future, response -> { - JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse(); - UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); - if (compilationResult.getSuccess()) { - scriptIdToNameMap.put(scriptId, functionName); - scriptIdToBodysMap.put(scriptId, scriptBody); - return compiledScriptId; - } else { - log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); - throw new RuntimeException(compilationResult.getErrorDetails()); - } - }, callbackExecutor); + return future; } - @Override - protected ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args) { - log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptId, maxRequestsTimeout); - final String scriptBody = scriptIdToBodysMap.get(scriptId); - if (scriptBody == null) { - return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!")); - } + private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(String scriptHash, String functionName, Object[] args, boolean includeScriptBody, String scriptBody) { JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) + .setScriptHash(scriptHash) .setFunctionName(functionName) - .setTimeout((int) maxExecRequestsTimeout) - .setScriptBody(scriptBody); - + .setTimeout((int) maxExecRequestsTimeout); + if (includeScriptBody) jsRequestBuilder.setScriptBody(scriptBody); for (Object arg : args) { jsRequestBuilder.addArgs(arg.toString()); } @@ -183,55 +253,16 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() .setInvokeRequest(jsRequestBuilder.build()) .build(); - - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - - ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); - if (maxRequestsTimeout > 0) { - future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); - } - queuePushedMsgs.incrementAndGet(); - Futures.addCallback(future, new FutureCallback>() { - @Override - public void onSuccess(@Nullable TbProtoQueueMsg result) { - queueInvokeMsgs.incrementAndGet(); - } - - @Override - public void onFailure(Throwable t) { - if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) { - queueTimeoutMsgs.incrementAndGet(); - } - queueFailedMsgs.incrementAndGet(); - } - }, callbackExecutor); - return Futures.transform(future, response -> { - stopWatch.stop(); - log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey()); - JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse(); - if (invokeResult.getSuccess()) { - return invokeResult.getResult(); - } else { - final RuntimeException e = new RuntimeException(invokeResult.getErrorDetails()); - if (JsInvokeProtos.JsInvokeErrorCode.TIMEOUT_ERROR.equals(invokeResult.getErrorCode())) { - onScriptExecutionError(scriptId, e, scriptBody); - queueTimeoutMsgs.incrementAndGet(); - } else if (JsInvokeProtos.JsInvokeErrorCode.COMPILATION_ERROR.equals(invokeResult.getErrorCode())) { - onScriptExecutionError(scriptId, e, scriptBody); - } - queueFailedMsgs.incrementAndGet(); - log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); - throw e; - } - }, callbackExecutor); + return jsRequestWrapper; } @Override - protected void doRelease(UUID scriptId, String functionName) throws Exception { + protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception { + if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).anyMatch(hash -> hash.equals(scriptHash))) { + return; + } JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) + .setScriptHash(scriptHash) .setFunctionName(functionName).build(); JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() @@ -244,12 +275,18 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } JsInvokeProtos.RemoteJsResponse response = future.get().getValue(); - JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse(); - UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); - if (compilationResult.getSuccess()) { - scriptIdToBodysMap.remove(scriptId); + JsInvokeProtos.JsReleaseResponse releaseResponse = response.getReleaseResponse(); + if (releaseResponse.getSuccess()) { + scriptsLock.lock(); + try { + if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).noneMatch(h -> h.equals(scriptHash))) { + scriptHashToBodysMap.remove(scriptHash); + } + } finally { + scriptsLock.unlock(); + } } else { - log.debug("[{}] Failed to release script due", compiledScriptId); + log.debug("[{}] Failed to release script", scriptHash); } } diff --git a/application/src/test/java/org/thingsboard/server/service/script/JsInvokeServiceTest.java b/application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java similarity index 98% rename from application/src/test/java/org/thingsboard/server/service/script/JsInvokeServiceTest.java rename to application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java index 8f0edd71c6..039a69c01f 100644 --- a/application/src/test/java/org/thingsboard/server/service/script/JsInvokeServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java @@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; "js.max_result_size=50", "js.local.max_errors=2" }) -class JsInvokeServiceTest extends AbstractControllerTest { +class LocalJsInvokeServiceTest extends AbstractControllerTest { @Autowired private NashornJsInvokeService jsInvokeService; diff --git a/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java b/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java new file mode 100644 index 0000000000..30b9b0b2c0 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java @@ -0,0 +1,219 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.script; + +import com.google.common.util.concurrent.Futures; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; +import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.usagestats.TbApiUsageClient; +import org.thingsboard.server.service.apiusage.TbApiUsageStateService; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +class RemoteJsInvokeServiceTest { + + private RemoteJsInvokeService remoteJsInvokeService; + private TbQueueRequestTemplate, TbProtoQueueMsg> jsRequestTemplate; + + + @BeforeEach + public void beforeEach() { + TbApiUsageStateService apiUsageStateService = mock(TbApiUsageStateService.class); + ApiUsageState apiUsageState = mock(ApiUsageState.class); + when(apiUsageState.isJsExecEnabled()).thenReturn(true); + when(apiUsageStateService.getApiUsageState(any())).thenReturn(apiUsageState); + TbApiUsageClient apiUsageClient = mock(TbApiUsageClient.class); + + remoteJsInvokeService = new RemoteJsInvokeService(apiUsageStateService, apiUsageClient); + jsRequestTemplate = mock(TbQueueRequestTemplate.class); + remoteJsInvokeService.requestTemplate = jsRequestTemplate; + } + + @AfterEach + public void afterEach() { + reset(jsRequestTemplate); + } + + @Test + public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception { + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + reset(jsRequestTemplate); + + String expectedInvocationResult = "scriptInvocationResult"; + doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() + .setSuccess(true) + .setResult(expectedInvocationResult) + .build()) + .build()))) + .when(jsRequestTemplate).send(any()); + + ArgumentCaptor> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); + Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get(); + verify(jsRequestTemplate).send(jsRequestCaptor.capture()); + + JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest(); + assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty(); + assertThat(jsInvokeRequestMade.getScriptHash()).isEqualTo(getScriptHash(scriptId)); + assertThat(invocationResult).isEqualTo(expectedInvocationResult); + } + + @Test + public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception { + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + reset(jsRequestTemplate); + + doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() + .setSuccess(false) + .setErrorCode(JsInvokeProtos.JsInvokeErrorCode.NOT_FOUND_ERROR) + .build()) + .build()))) + .when(jsRequestTemplate).send(argThat(jsQueueMsg -> { + return StringUtils.isEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody()); + })); + + String expectedInvocationResult = "invocationResult"; + doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() + .setSuccess(true) + .setResult(expectedInvocationResult) + .build()) + .build()))) + .when(jsRequestTemplate).send(argThat(jsQueueMsg -> { + return StringUtils.isNotEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody()); + })); + + ArgumentCaptor> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); + Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get(); + verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture()); + + List> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues(); + + JsInvokeProtos.JsInvokeRequest firstRequestMade = jsInvokeRequestsMade.get(0).getValue().getInvokeRequest(); + assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty(); + + JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest(); + assertThat(secondRequestMade.getScriptBody()).contains(scriptBody); + + assertThat(jsInvokeRequestsMade.stream().map(TbProtoQueueMsg::getKey).distinct().count()).as("partition keys are same") + .isOne(); + + assertThat(invocationResult).isEqualTo(expectedInvocationResult); + } + + @Test + public void whenDoingEval_thenSaveScriptByHashOfTenantIdAndScriptBody() throws Exception { + mockJsEvalResponse(); + + TenantId tenantId1 = TenantId.fromUUID(UUID.randomUUID()); + String scriptBody1 = "var msg = { temp: 42, humidity: 77 };\n" + + "var metadata = { data: 40 };\n" + + "var msgType = \"POST_TELEMETRY_REQUEST\";\n" + + "\n" + + "return { msg: msg, metadata: metadata, msgType: msgType };"; + + Set scriptHashes = new HashSet<>(); + String tenant1Script1Hash = null; + for (int i = 0; i < 3; i++) { + UUID scriptUuid = remoteJsInvokeService.eval(tenantId1, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get(); + tenant1Script1Hash = getScriptHash(scriptUuid); + scriptHashes.add(tenant1Script1Hash); + } + assertThat(scriptHashes).as("Unique scripts ids").size().isOne(); + + TenantId tenantId2 = TenantId.fromUUID(UUID.randomUUID()); + UUID scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get(); + String tenant2Script1Id = getScriptHash(scriptUuid); + assertThat(tenant2Script1Id).isNotEqualTo(tenant1Script1Hash); + + String scriptBody2 = scriptBody1 + ";;"; + scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody2).get(); + String tenant2Script2Id = getScriptHash(scriptUuid); + assertThat(tenant2Script2Id).isNotEqualTo(tenant2Script1Id); + } + + @Test + public void whenReleasingScript_thenCheckForHashUsages() throws Exception { + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId1 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + UUID scriptId2 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + String scriptHash = getScriptHash(scriptId1); + assertThat(scriptHash).isEqualTo(getScriptHash(scriptId2)); + reset(jsRequestTemplate); + + doReturn(Futures.immediateFuture(new TbProtoQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setReleaseResponse(JsInvokeProtos.JsReleaseResponse.newBuilder() + .setSuccess(true) + .build()) + .build()))) + .when(jsRequestTemplate).send(any()); + + remoteJsInvokeService.release(scriptId1).get(); + verifyNoInteractions(jsRequestTemplate); + assertThat(remoteJsInvokeService.scriptHashToBodysMap).containsKey(scriptHash); + + remoteJsInvokeService.release(scriptId2).get(); + verify(jsRequestTemplate).send(any()); + assertThat(remoteJsInvokeService.scriptHashToBodysMap).isEmpty(); + } + + private String getScriptHash(UUID scriptUuid) { + return remoteJsInvokeService.scriptIdToNameAndHashMap.get(scriptUuid).getSecond(); + } + + private void mockJsEvalResponse() { + doAnswer(methodCall -> Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setCompileResponse(JsInvokeProtos.JsCompileResponse.newBuilder() + .setSuccess(true) + .setScriptHash(methodCall.>getArgument(0).getValue().getCompileRequest().getScriptHash()) + .build()) + .build()))) + .when(jsRequestTemplate).send(argThat(jsQueueMsg -> jsQueueMsg.getValue().hasCompileRequest())); + } + +} diff --git a/common/cluster-api/src/main/proto/jsinvoke.proto b/common/cluster-api/src/main/proto/jsinvoke.proto index 8cae69f198..73f988af45 100644 --- a/common/cluster-api/src/main/proto/jsinvoke.proto +++ b/common/cluster-api/src/main/proto/jsinvoke.proto @@ -23,6 +23,7 @@ enum JsInvokeErrorCode { COMPILATION_ERROR = 0; RUNTIME_ERROR = 1; TIMEOUT_ERROR = 2; + NOT_FOUND_ERROR = 3; } message RemoteJsRequest { @@ -40,39 +41,34 @@ message RemoteJsResponse { } message JsCompileRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; string functionName = 3; string scriptBody = 4; + string scriptHash = 5; } message JsReleaseRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; string functionName = 3; + string scriptHash = 4; } message JsReleaseResponse { bool success = 1; - int64 scriptIdMSB = 2; - int64 scriptIdLSB = 3; + string scriptHash = 4; } message JsCompileResponse { bool success = 1; - int64 scriptIdMSB = 2; - int64 scriptIdLSB = 3; JsInvokeErrorCode errorCode = 4; string errorDetails = 5; + string scriptHash = 6; } message JsInvokeRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; string functionName = 3; string scriptBody = 4; int32 timeout = 5; repeated string args = 6; + string scriptHash = 7; } message JsInvokeResponse { @@ -81,4 +77,3 @@ message JsInvokeResponse { JsInvokeErrorCode errorCode = 3; string errorDetails = 4; } - diff --git a/msa/js-executor/api/jsExecutor.models.ts b/msa/js-executor/api/jsExecutor.models.ts index db2ced52c4..7a6b53cd8a 100644 --- a/msa/js-executor/api/jsExecutor.models.ts +++ b/msa/js-executor/api/jsExecutor.models.ts @@ -16,8 +16,9 @@ export interface TbMessage { - scriptIdMSB: string; - scriptIdLSB: string; + scriptIdMSB: string; // deprecated + scriptIdLSB: string; // deprecated + scriptHash: string; } export interface RemoteJsRequest { diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.ts b/msa/js-executor/api/jsInvokeMessageProcessor.ts index f1b60b6e07..668cd61f50 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.ts +++ b/msa/js-executor/api/jsInvokeMessageProcessor.ts @@ -18,7 +18,7 @@ import config from 'config'; import { _logger } from '../config/logger'; import { JsExecutor, TbScript } from './jsExecutor'; import { performance } from 'perf_hooks'; -import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits } from './utils'; +import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits, isNotUUID } from './utils'; import { IQueue } from '../queue/queue.models'; import { JsCompileRequest, @@ -36,6 +36,7 @@ import Long from 'long'; const COMPILATION_ERROR = 0; const RUNTIME_ERROR = 1; const TIMEOUT_ERROR = 2; +const NOT_FOUND_ERROR = 3; const statFrequency = Number(config.get('script.stat_print_frequency')); const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency')); @@ -129,7 +130,12 @@ export class JsInvokeMessageProcessor { processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) { const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest); this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); - + if (this.scriptMap.has(scriptId)) { + const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true); + this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId); + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); + return; + } this.executor.compileScript(compileRequest.scriptBody).then( (script) => { this.cacheScript(scriptId, script); @@ -170,7 +176,7 @@ export class JsInvokeMessageProcessor { this.logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } else { - let err = { + const err = { name: 'Error', message: 'script invocation result exceeds maximum allowed size of ' + maxResultSize + ' symbols' } @@ -193,8 +199,12 @@ export class JsInvokeMessageProcessor { ) }, (err: any) => { - const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, COMPILATION_ERROR, err); - this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); + let errorCode = COMPILATION_ERROR; + if (err?.name === 'script body not found') { + errorCode = NOT_FOUND_ERROR; + } + const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err); + this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } ); @@ -222,7 +232,7 @@ export class JsInvokeMessageProcessor { const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); - this.producer.send(responseTopic, scriptId, rawResponse, headers).then( + this.producer.send(responseTopic, requestId, rawResponse, headers).then( () => { this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); }, @@ -242,7 +252,7 @@ export class JsInvokeMessageProcessor { if (script) { self.incrementUseScriptId(scriptId); resolve(script); - } else { + } else if (scriptBody) { const startTime = performance.now(); self.executor.compileScript(scriptBody).then( (compiledScript) => { @@ -255,6 +265,12 @@ export class JsInvokeMessageProcessor { reject(err); } ); + } else { + const err = { + name: 'script body not found', + message: '' + } + reject(err); } }); } @@ -285,14 +301,26 @@ export class JsInvokeMessageProcessor { } private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse { - const scriptIdBits = UUIDToBits(scriptId); - return { - errorCode: errorCode, - success: success, - errorDetails: parseJsErrorDetails(err), - scriptIdMSB: scriptIdBits[0], - scriptIdLSB: scriptIdBits[1] - }; + if (isNotUUID(scriptId)) { + return { + errorCode: errorCode, + success: success, + errorDetails: parseJsErrorDetails(err), + scriptIdMSB: "0", + scriptIdLSB: "0", + scriptHash: scriptId + }; + } else { // this is for backward compatibility (to be able to work with tb-node of previous version) - todo: remove in the next release + let scriptIdBits = UUIDToBits(scriptId); + return { + errorCode: errorCode, + success: success, + errorDetails: parseJsErrorDetails(err), + scriptIdMSB: scriptIdBits[0], + scriptIdLSB: scriptIdBits[1], + scriptHash: "" + }; + } } private static createInvokeResponse(result: string, success: boolean, errorCode?: number, err?: any): JsInvokeResponse { @@ -305,16 +333,26 @@ export class JsInvokeMessageProcessor { } private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse { - const scriptIdBits = UUIDToBits(scriptId); - return { - success: success, - scriptIdMSB: scriptIdBits[0], - scriptIdLSB: scriptIdBits[1] - }; + if (isNotUUID(scriptId)) { + return { + success: success, + scriptIdMSB: "0", + scriptIdLSB: "0", + scriptHash: scriptId, + }; + } else { // todo: remove in the next release + let scriptIdBits = UUIDToBits(scriptId); + return { + success: success, + scriptIdMSB: scriptIdBits[0], + scriptIdLSB: scriptIdBits[1], + scriptHash: "" + } + } } private static getScriptId(request: TbMessage): string { - return toUUIDString(request.scriptIdMSB, request.scriptIdLSB); + return request.scriptHash ? request.scriptHash : toUUIDString(request.scriptIdMSB, request.scriptIdLSB); } private incrementUseScriptId(scriptId: string) { diff --git a/msa/js-executor/api/utils.ts b/msa/js-executor/api/utils.ts index 361025f806..58fec28b0b 100644 --- a/msa/js-executor/api/utils.ts +++ b/msa/js-executor/api/utils.ts @@ -58,3 +58,7 @@ export function parseJsErrorDetails(err: any): string | undefined { } return details; } + +export function isNotUUID(candidate: string) { + return candidate.length != 36 || !candidate.includes('-'); +} diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 259d285cf2..31de1ae73f 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -123,10 +123,10 @@ export class AwsSqsTemplate implements IQueue { this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { let msgBody = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 51fa6e291b..7c34d99889 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -149,12 +149,11 @@ export class KafkaTemplate implements IQueue { }); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { - this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { const message = { topic: responseTopic, messages: [{ - key: scriptId, + key: msgKey, value: rawResponse, headers: headers.data }] diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index eff35017ba..9e8ee52b8b 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -80,7 +80,7 @@ export class PubSubTemplate implements IQueue { subscription.on('message', messageHandler); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) { await this.createTopic(this.requestTopic); await this.createSubscription(this.requestTopic); @@ -88,7 +88,7 @@ export class PubSubTemplate implements IQueue { let data = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index a86dc8fd1d..36932a5ee5 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -17,6 +17,6 @@ export interface IQueue { name: string; init(): Promise; - send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; + send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise; destroy(): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index ccd3cef54b..f4fe51a0ae 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -65,7 +65,7 @@ export class RabbitMqTemplate implements IQueue { }) } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!this.topics.includes(responseTopic)) { await this.createQueue(responseTopic); @@ -74,7 +74,7 @@ export class RabbitMqTemplate implements IQueue { let data = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index 76d87e8068..d2abacd289 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -82,7 +82,7 @@ export class ServiceBusTemplate implements IQueue { this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!this.queues.includes(this.requestTopic)) { await this.createQueueIfNotExist(this.requestTopic); this.queues.push(this.requestTopic); @@ -96,7 +96,7 @@ export class ServiceBusTemplate implements IQueue { } let data = { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers };