diff --git a/application/src/test/java/org/thingsboard/server/service/script/ScriptInvokeServiceTest.java b/application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java similarity index 98% rename from application/src/test/java/org/thingsboard/server/service/script/ScriptInvokeServiceTest.java rename to application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java index 1760609c74..857a9b7902 100644 --- a/application/src/test/java/org/thingsboard/server/service/script/ScriptInvokeServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java @@ -37,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; "js.max_result_size=50", "js.local.max_errors=2" }) -class ScriptInvokeServiceTest extends AbstractControllerTest { +class LocalJsInvokeServiceTest extends AbstractControllerTest { @Autowired private NashornJsInvokeService jsInvokeService; diff --git a/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java b/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java new file mode 100644 index 0000000000..636af5432e --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java @@ -0,0 +1,221 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.script; + +import com.google.common.util.concurrent.Futures; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.thingsboard.script.api.ScriptType; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.stats.TbApiUsageReportClient; +import org.thingsboard.server.common.stats.TbApiUsageStateClient; +import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; +import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +class RemoteJsInvokeServiceTest { + + private RemoteJsInvokeService remoteJsInvokeService; + private TbQueueRequestTemplate, TbProtoQueueMsg> jsRequestTemplate; + + + @BeforeEach + public void beforeEach() { + TbApiUsageStateClient apiUsageStateClient = mock(TbApiUsageStateClient.class); + ApiUsageState apiUsageState = mock(ApiUsageState.class); + when(apiUsageState.isJsExecEnabled()).thenReturn(true); + when(apiUsageStateClient.getApiUsageState(any())).thenReturn(apiUsageState); + TbApiUsageReportClient apiUsageReportClient = mock(TbApiUsageReportClient.class); + + remoteJsInvokeService = new RemoteJsInvokeService(Optional.of(apiUsageStateClient), Optional.of(apiUsageReportClient)); + jsRequestTemplate = mock(TbQueueRequestTemplate.class); + remoteJsInvokeService.requestTemplate = jsRequestTemplate; + } + + @AfterEach + public void afterEach() { + reset(jsRequestTemplate); + } + + @Test + public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception { + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + reset(jsRequestTemplate); + + String expectedInvocationResult = "scriptInvocationResult"; + doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() + .setSuccess(true) + .setResult(expectedInvocationResult) + .build()) + .build()))) + .when(jsRequestTemplate).send(any()); + + ArgumentCaptor> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); + Object invocationResult = remoteJsInvokeService.invokeScript(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get(); + verify(jsRequestTemplate).send(jsRequestCaptor.capture()); + + JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest(); + assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty(); + assertThat(jsInvokeRequestMade.getScriptHash()).isEqualTo(getScriptHash(scriptId)); + assertThat(invocationResult).isEqualTo(expectedInvocationResult); + } + + @Test + public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception { + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + reset(jsRequestTemplate); + + doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() + .setSuccess(false) + .setErrorCode(JsInvokeProtos.JsInvokeErrorCode.NOT_FOUND_ERROR) + .build()) + .build()))) + .when(jsRequestTemplate).send(argThat(jsQueueMsg -> { + return StringUtils.isEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody()); + })); + + String expectedInvocationResult = "invocationResult"; + doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() + .setSuccess(true) + .setResult(expectedInvocationResult) + .build()) + .build()))) + .when(jsRequestTemplate).send(argThat(jsQueueMsg -> { + return StringUtils.isNotEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody()); + })); + + ArgumentCaptor> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); + Object invocationResult = remoteJsInvokeService.invokeScript(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get(); + verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture()); + + List> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues(); + + JsInvokeProtos.JsInvokeRequest firstRequestMade = jsInvokeRequestsMade.get(0).getValue().getInvokeRequest(); + assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty(); + + JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest(); + assertThat(secondRequestMade.getScriptBody()).contains(scriptBody); + + assertThat(jsInvokeRequestsMade.stream().map(TbProtoQueueMsg::getKey).distinct().count()).as("partition keys are same") + .isOne(); + + assertThat(invocationResult).isEqualTo(expectedInvocationResult); + } + + @Test + public void whenDoingEval_thenSaveScriptByHashOfTenantIdAndScriptBody() throws Exception { + mockJsEvalResponse(); + + TenantId tenantId1 = TenantId.fromUUID(UUID.randomUUID()); + String scriptBody1 = "var msg = { temp: 42, humidity: 77 };\n" + + "var metadata = { data: 40 };\n" + + "var msgType = \"POST_TELEMETRY_REQUEST\";\n" + + "\n" + + "return { msg: msg, metadata: metadata, msgType: msgType };"; + + Set scriptHashes = new HashSet<>(); + String tenant1Script1Hash = null; + for (int i = 0; i < 3; i++) { + UUID scriptUuid = remoteJsInvokeService.eval(tenantId1, ScriptType.RULE_NODE_SCRIPT, scriptBody1).get(); + tenant1Script1Hash = getScriptHash(scriptUuid); + scriptHashes.add(tenant1Script1Hash); + } + assertThat(scriptHashes).as("Unique scripts ids").size().isOne(); + + TenantId tenantId2 = TenantId.fromUUID(UUID.randomUUID()); + UUID scriptUuid = remoteJsInvokeService.eval(tenantId2, ScriptType.RULE_NODE_SCRIPT, scriptBody1).get(); + String tenant2Script1Id = getScriptHash(scriptUuid); + assertThat(tenant2Script1Id).isNotEqualTo(tenant1Script1Hash); + + String scriptBody2 = scriptBody1 + ";;"; + scriptUuid = remoteJsInvokeService.eval(tenantId2, ScriptType.RULE_NODE_SCRIPT, scriptBody2).get(); + String tenant2Script2Id = getScriptHash(scriptUuid); + assertThat(tenant2Script2Id).isNotEqualTo(tenant2Script1Id); + } + + @Test + public void whenReleasingScript_thenCheckForHashUsages() throws Exception { + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId1 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + UUID scriptId2 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + String scriptHash = getScriptHash(scriptId1); + assertThat(scriptHash).isEqualTo(getScriptHash(scriptId2)); + reset(jsRequestTemplate); + + doReturn(Futures.immediateFuture(new TbProtoQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setReleaseResponse(JsInvokeProtos.JsReleaseResponse.newBuilder() + .setSuccess(true) + .build()) + .build()))) + .when(jsRequestTemplate).send(any()); + + remoteJsInvokeService.release(scriptId1).get(); + verifyNoInteractions(jsRequestTemplate); + assertThat(remoteJsInvokeService.scriptHashToBodysMap).containsKey(scriptHash); + + remoteJsInvokeService.release(scriptId2).get(); + verify(jsRequestTemplate).send(any()); + assertThat(remoteJsInvokeService.scriptHashToBodysMap).isEmpty(); + } + + private String getScriptHash(UUID scriptUuid) { + return remoteJsInvokeService.getScriptHash(scriptUuid); + } + + private void mockJsEvalResponse() { + doAnswer(methodCall -> Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setCompileResponse(JsInvokeProtos.JsCompileResponse.newBuilder() + .setSuccess(true) + .setScriptHash(methodCall.>getArgument(0).getValue().getCompileRequest().getScriptHash()) + .build()) + .build()))) + .when(jsRequestTemplate).send(argThat(jsQueueMsg -> jsQueueMsg.getValue().hasCompileRequest())); + } + +} diff --git a/common/cluster-api/src/main/proto/jsinvoke.proto b/common/cluster-api/src/main/proto/jsinvoke.proto index 8cae69f198..73f988af45 100644 --- a/common/cluster-api/src/main/proto/jsinvoke.proto +++ b/common/cluster-api/src/main/proto/jsinvoke.proto @@ -23,6 +23,7 @@ enum JsInvokeErrorCode { COMPILATION_ERROR = 0; RUNTIME_ERROR = 1; TIMEOUT_ERROR = 2; + NOT_FOUND_ERROR = 3; } message RemoteJsRequest { @@ -40,39 +41,34 @@ message RemoteJsResponse { } message JsCompileRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; string functionName = 3; string scriptBody = 4; + string scriptHash = 5; } message JsReleaseRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; string functionName = 3; + string scriptHash = 4; } message JsReleaseResponse { bool success = 1; - int64 scriptIdMSB = 2; - int64 scriptIdLSB = 3; + string scriptHash = 4; } message JsCompileResponse { bool success = 1; - int64 scriptIdMSB = 2; - int64 scriptIdLSB = 3; JsInvokeErrorCode errorCode = 4; string errorDetails = 5; + string scriptHash = 6; } message JsInvokeRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; string functionName = 3; string scriptBody = 4; int32 timeout = 5; repeated string args = 6; + string scriptHash = 7; } message JsInvokeResponse { @@ -81,4 +77,3 @@ message JsInvokeResponse { JsInvokeErrorCode errorCode = 3; string errorDetails = 4; } - diff --git a/common/script/remote-js-client/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/common/script/remote-js-client/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 884fa2a4ee..bda8d6429a 100644 --- a/common/script/remote-js-client/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/common/script/remote-js-client/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -17,8 +17,10 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -28,6 +30,7 @@ import org.springframework.util.StopWatch; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.script.api.TbScriptException; import org.thingsboard.script.api.js.AbstractJsInvokeService; +import org.thingsboard.script.api.js.JsScriptInfo; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.common.stats.TbApiUsageStateClient; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -46,6 +49,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')") @@ -98,9 +103,10 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } @Autowired - private TbQueueRequestTemplate, TbProtoQueueMsg> requestTemplate; + protected TbQueueRequestTemplate, TbProtoQueueMsg> requestTemplate; - private Map scriptIdToBodysMap = new ConcurrentHashMap<>(); + protected final Map scriptHashToBodysMap = new ConcurrentHashMap<>(); + private final Lock scriptsLock = new ReentrantLock(); @PostConstruct public void init() { @@ -117,53 +123,46 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } @Override - protected ListenableFuture doEval(UUID scriptId, String functionName, String scriptBody) { + protected ListenableFuture doEval(UUID scriptId, JsScriptInfo jsInfo, String scriptBody) { JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) - .setFunctionName(functionName) + .setScriptHash(jsInfo.getHash()) + .setFunctionName(jsInfo.getFunctionName()) .setScriptBody(scriptBody).build(); JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() .setCompileRequest(jsRequest) .build(); - log.trace("Post compile request for scriptId [{}]", scriptId); + log.trace("Post compile request for scriptId [{}] (hash: {})", scriptId, jsInfo.getHash()); ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); return Futures.transform(future, response -> { JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse(); - UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); if (compilationResult.getSuccess()) { - scriptIdToNameMap.put(scriptId, functionName); - scriptIdToBodysMap.put(scriptId, scriptBody); - return compiledScriptId; + scriptsLock.lock(); + try { + scriptInfoMap.put(scriptId, jsInfo); + scriptHashToBodysMap.put(jsInfo.getHash(), scriptBody); + } finally { + scriptsLock.unlock(); + } + return scriptId; } else { - log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); + log.debug("[{}] (hash: {}) Failed to compile script due to [{}]: {}", scriptId, compilationResult.getScriptHash(), + compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, new RuntimeException(compilationResult.getErrorDetails())); } }, callbackExecutor); } @Override - protected ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args) { - final String scriptBody = scriptIdToBodysMap.get(scriptId); + protected ListenableFuture doInvokeFunction(UUID scriptId, JsScriptInfo jsInfo, Object[] args) { + var scriptHash = jsInfo.getHash(); + String scriptBody = scriptHashToBodysMap.get(scriptHash); if (scriptBody == null) { - return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!")); - } - JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) - .setFunctionName(functionName) - .setTimeout((int) maxExecRequestsTimeout) - .setScriptBody(scriptBody); - - for (Object arg : args) { - jsRequestBuilder.addArgs(arg.toString()); + return Futures.immediateFailedFuture(new RuntimeException("No script body found for script hash [" + scriptHash + "] (script id: [" + scriptId + "])")); } - JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() - .setInvokeRequest(jsRequestBuilder.build()) - .build(); + JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(jsInfo, args, false, null); StopWatch stopWatch; if (log.isTraceEnabled()) { @@ -173,35 +172,80 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { stopWatch = null; } - ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); - return Futures.transform(future, response -> { + UUID requestKey = UUID.randomUUID(); + ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, jsRequestWrapper)); + return Futures.transformAsync(future, response -> { if (log.isTraceEnabled()) { stopWatch.stop(); log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey()); } JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse(); if (invokeResult.getSuccess()) { - return invokeResult.getResult(); + return Futures.immediateFuture(invokeResult.getResult()); } else { - final RuntimeException e = new RuntimeException(invokeResult.getErrorDetails()); - log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); - if (JsInvokeProtos.JsInvokeErrorCode.TIMEOUT_ERROR.equals(invokeResult.getErrorCode())) { - throw new TbScriptException(scriptId, TbScriptException.ErrorCode.TIMEOUT, scriptBody, new TimeoutException()); - } else if (JsInvokeProtos.JsInvokeErrorCode.COMPILATION_ERROR.equals(invokeResult.getErrorCode())) { - throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, e); - } else { - throw new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, scriptBody, e); - } + return handleInvokeError(requestKey, scriptId, jsInfo, invokeResult.getErrorCode(), invokeResult.getErrorDetails(), scriptBody, args); } }, callbackExecutor); } + @NotNull + private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(JsScriptInfo jsInfo, Object[] args, boolean includeScriptBody, String scriptBody) { + JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder() + .setScriptHash(jsInfo.getHash()) + .setFunctionName(jsInfo.getFunctionName()) + .setTimeout((int) maxExecRequestsTimeout); + if (includeScriptBody) { + jsRequestBuilder.setScriptBody(scriptBody); + } + + for (Object arg : args) { + jsRequestBuilder.addArgs(arg.toString()); + } + + JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() + .setInvokeRequest(jsRequestBuilder.build()) + .build(); + return jsRequestWrapper; + } + + private ListenableFuture handleInvokeError(UUID requestKey, UUID scriptId, JsScriptInfo jsInfo, + JsInvokeProtos.JsInvokeErrorCode errorCode, String errorDetails, + String scriptBody, Object[] args) { + final RuntimeException e = new RuntimeException(errorDetails); + log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails); + if (JsInvokeProtos.JsInvokeErrorCode.TIMEOUT_ERROR.equals(errorCode)) { + throw new TbScriptException(scriptId, TbScriptException.ErrorCode.TIMEOUT, scriptBody, new TimeoutException()); + } else if (JsInvokeProtos.JsInvokeErrorCode.COMPILATION_ERROR.equals(errorCode)) { + throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, e); + } else if (JsInvokeProtos.JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) { + log.debug("[{}] Remote JS executor couldn't find the script", scriptId); + if (scriptBody != null) { + JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(jsInfo, args, true, scriptBody); + log.debug("[{}] Sending invoke request again with script body", scriptId); + ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, invokeRequestWithScriptBody)); + return Futures.transformAsync(future, response -> { + JsInvokeProtos.JsInvokeResponse result = response.getValue().getInvokeResponse(); + if (result.getSuccess()) { + return Futures.immediateFuture(result.getResult()); + } else { + return handleInvokeError(requestKey, scriptId, jsInfo, result.getErrorCode(), result.getErrorDetails(), null, args); + } + }, MoreExecutors.directExecutor()); + } + } + throw new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, scriptBody, e); + } + @Override - protected void doRelease(UUID scriptId, String functionName) throws Exception { + protected void doRelease(UUID scriptId, JsScriptInfo jsInfo) throws Exception { + String scriptHash = jsInfo.getHash(); + if (scriptInfoMap.values().stream().map(JsScriptInfo::getHash).anyMatch(hash -> hash.equals(scriptHash))) { + return; + } + JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) - .setFunctionName(functionName).build(); + .setScriptHash(scriptHash) + .setFunctionName(jsInfo.getFunctionName()).build(); JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() .setReleaseRequest(jsRequest) @@ -213,13 +257,28 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } JsInvokeProtos.RemoteJsResponse response = future.get().getValue(); - JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse(); - UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); - if (compilationResult.getSuccess()) { - scriptIdToBodysMap.remove(scriptId); + JsInvokeProtos.JsReleaseResponse releaseResponse = response.getReleaseResponse(); + if (releaseResponse.getSuccess()) { + scriptsLock.lock(); + try { + if (scriptInfoMap.values().stream().map(JsScriptInfo::getHash).noneMatch(hash -> hash.equals(scriptHash))) { + scriptHashToBodysMap.remove(scriptHash); + } + } finally { + scriptsLock.unlock(); + } } else { - log.debug("[{}] Failed to release script due", compiledScriptId); + log.debug("[{}] Failed to release script", scriptHash); } } + protected String constructFunctionName(UUID scriptId, String scriptHash) { + return "invokeInternal_" + scriptHash; + } + + protected String getScriptHash(UUID scriptId) { + JsScriptInfo jsScriptInfo = scriptInfoMap.get(scriptId); + return jsScriptInfo != null ? jsScriptInfo.getHash() : null; + } + } diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java index 1a7838d06d..db376f8581 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java @@ -15,13 +15,12 @@ */ package org.thingsboard.script.api; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.springframework.data.util.Pair; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -46,7 +45,7 @@ import static java.lang.String.format; @Slf4j public abstract class AbstractScriptInvokeService implements ScriptInvokeService { - protected Map disabledScripts = new ConcurrentHashMap<>(); + protected final Map disabledScripts = new ConcurrentHashMap<>(); private final Optional apiUsageStateClient; private final Optional apiUsageReportClient; @@ -90,7 +89,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService protected abstract boolean isScriptPresent(UUID scriptId); - protected abstract ListenableFuture doEvalScript(ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames); + protected abstract ListenableFuture doEvalScript(TenantId tenantId, ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames); protected abstract ListenableFuture doInvokeFunction(UUID scriptId, Object[] args); @@ -130,7 +129,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService } UUID scriptId = UUID.randomUUID(); pushedMsgs.incrementAndGet(); - return withTimeoutAndStatsCallback(scriptId, doEvalScript(scriptType, scriptBody, scriptId, argNames), evalCallback, getMaxEvalRequestsTimeout()); + return withTimeoutAndStatsCallback(scriptId, doEvalScript(tenantId, scriptType, scriptBody, scriptId, argNames), evalCallback, getMaxEvalRequestsTimeout()); } else { return error("Script Execution is disabled due to API limits!"); } diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/js/AbstractJsInvokeService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/js/AbstractJsInvokeService.java index cc7c3b7792..66a2e3aec2 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/js/AbstractJsInvokeService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/js/AbstractJsInvokeService.java @@ -15,14 +15,16 @@ */ package org.thingsboard.script.api.js; -import com.google.common.util.concurrent.Futures; +import com.google.common.hash.Hashing; import com.google.common.util.concurrent.ListenableFuture; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.util.Pair; import org.thingsboard.script.api.AbstractScriptInvokeService; import org.thingsboard.script.api.RuleNodeScriptFactory; import org.thingsboard.script.api.ScriptType; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.common.stats.TbApiUsageStateClient; @@ -35,9 +37,9 @@ import java.util.concurrent.ConcurrentHashMap; * Created by ashvayka on 26.09.18. */ @Slf4j -public abstract class AbstractJsInvokeService extends AbstractScriptInvokeService implements JsInvokeService{ +public abstract class AbstractJsInvokeService extends AbstractScriptInvokeService implements JsInvokeService { - protected Map scriptIdToNameMap = new ConcurrentHashMap<>(); + protected final Map scriptInfoMap = new ConcurrentHashMap<>(); @Getter @Value("${js.max_total_args_size:100000}") @@ -55,47 +57,32 @@ public abstract class AbstractJsInvokeService extends AbstractScriptInvokeServic @Override protected boolean isScriptPresent(UUID scriptId) { - return scriptIdToNameMap.containsKey(scriptId); - } - - @Override - public ListenableFuture release(UUID scriptId) { - String functionName = scriptIdToNameMap.get(scriptId); - if (functionName != null) { - try { - scriptIdToNameMap.remove(scriptId); - disabledScripts.remove(scriptId); - doRelease(scriptId, functionName); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } - } - return Futures.immediateFuture(null); + return scriptInfoMap.containsKey(scriptId); } @Override protected ListenableFuture doInvokeFunction(UUID scriptId, Object[] args) { - return doInvokeFunction(scriptId, scriptIdToNameMap.get(scriptId), args); + return doInvokeFunction(scriptId, scriptInfoMap.get(scriptId), args); } @Override - protected ListenableFuture doEvalScript(ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) { - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); + protected ListenableFuture doEvalScript(TenantId tenantId, ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) { + String scriptHash = hash(tenantId, scriptBody); + String functionName = constructFunctionName(scriptId, scriptHash); String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); - return doEval(scriptId, functionName, jsScript); + return doEval(scriptId, new JsScriptInfo(scriptHash, functionName), jsScript); } @Override protected void doRelease(UUID scriptId) throws Exception { - String functionName = scriptIdToNameMap.remove(scriptId); - doRelease(scriptId, functionName); + doRelease(scriptId, scriptInfoMap.remove(scriptId)); } - protected abstract ListenableFuture doEval(UUID scriptId, String functionName, String scriptBody); + protected abstract ListenableFuture doEval(UUID scriptId, JsScriptInfo jsInfo, String scriptBody); - protected abstract ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args); + protected abstract ListenableFuture doInvokeFunction(UUID scriptId, JsScriptInfo jsInfo, Object[] args); - protected abstract void doRelease(UUID scriptId, String functionName) throws Exception; + protected abstract void doRelease(UUID scriptId, JsScriptInfo scriptInfo) throws Exception; private String generateJsScript(ScriptType scriptType, String functionName, String scriptBody, String... argNames) { if (scriptType == ScriptType.RULE_NODE_SCRIPT) { @@ -104,4 +91,16 @@ public abstract class AbstractJsInvokeService extends AbstractScriptInvokeServic throw new RuntimeException("No script factory implemented for scriptType: " + scriptType); } + protected String constructFunctionName(UUID scriptId, String scriptHash) { + return "invokeInternal_" + scriptId.toString().replace('-', '_'); + } + + protected String hash(TenantId tenantId, String scriptBody) { + return Hashing.murmur3_128().newHasher() + .putLong(tenantId.getId().getMostSignificantBits()) + .putLong(tenantId.getId().getLeastSignificantBits()) + .putUnencodedChars(scriptBody) + .hash().toString(); + } + } diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/js/JsScriptInfo.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/js/JsScriptInfo.java new file mode 100644 index 0000000000..795394cb95 --- /dev/null +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/js/JsScriptInfo.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.script.api.js; + +import lombok.Data; + +@Data +public class JsScriptInfo { + + private final String hash; + private final String functionName; + +} diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/js/NashornJsInvokeService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/js/NashornJsInvokeService.java index 559731c025..61e03d4077 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/js/NashornJsInvokeService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/js/NashornJsInvokeService.java @@ -130,7 +130,7 @@ public class NashornJsInvokeService extends AbstractJsInvokeService { } @Override - protected ListenableFuture doEval(UUID scriptId, String functionName, String jsScript) { + protected ListenableFuture doEval(UUID scriptId, JsScriptInfo scriptInfo, String jsScript) { return jsExecutor.submit(() -> { try { evalLock.lock(); @@ -143,7 +143,7 @@ public class NashornJsInvokeService extends AbstractJsInvokeService { } finally { evalLock.unlock(); } - scriptIdToNameMap.put(scriptId, functionName); + scriptInfoMap.put(scriptId, scriptInfo); return scriptId; } catch (Exception e) { throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, jsScript, e); @@ -152,13 +152,13 @@ public class NashornJsInvokeService extends AbstractJsInvokeService { } @Override - protected ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args) { + protected ListenableFuture doInvokeFunction(UUID scriptId, JsScriptInfo scriptInfo, Object[] args) { return jsExecutor.submit(() -> { try { if (useJsSandbox) { - return sandbox.getSandboxedInvocable().invokeFunction(functionName, args); + return sandbox.getSandboxedInvocable().invokeFunction(scriptInfo.getFunctionName(), args); } else { - return ((Invocable) engine).invokeFunction(functionName, args); + return ((Invocable) engine).invokeFunction(scriptInfo.getFunctionName(), args); } } catch (ScriptException e) { throw new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, null, e); @@ -168,11 +168,11 @@ public class NashornJsInvokeService extends AbstractJsInvokeService { }); } - protected void doRelease(UUID scriptId, String functionName) throws ScriptException { + protected void doRelease(UUID scriptId, JsScriptInfo scriptInfo) throws ScriptException { if (useJsSandbox) { - sandbox.eval(functionName + " = undefined;"); + sandbox.eval(scriptInfo.getFunctionName() + " = undefined;"); } else { - engine.eval(functionName + " = undefined;"); + engine.eval(scriptInfo.getFunctionName() + " = undefined;"); } } diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/mvel/DefaultMvelInvokeService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/mvel/DefaultMvelInvokeService.java index d1486200e9..06f225cb1d 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/mvel/DefaultMvelInvokeService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/mvel/DefaultMvelInvokeService.java @@ -35,6 +35,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.script.api.AbstractScriptInvokeService; import org.thingsboard.script.api.ScriptType; import org.thingsboard.script.api.TbScriptException; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.common.stats.TbApiUsageStateClient; @@ -132,7 +133,7 @@ public class DefaultMvelInvokeService extends AbstractScriptInvokeService implem } @Override - protected ListenableFuture doEvalScript(ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) { + protected ListenableFuture doEvalScript(TenantId tenantId, ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) { if (NEW_KEYWORD_PATTERN.matcher(scriptBody).matches()) { //TODO: output line number and char pos. return Futures.immediateFailedFuture(new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, diff --git a/msa/js-executor/api/jsExecutor.models.ts b/msa/js-executor/api/jsExecutor.models.ts index db2ced52c4..7a6b53cd8a 100644 --- a/msa/js-executor/api/jsExecutor.models.ts +++ b/msa/js-executor/api/jsExecutor.models.ts @@ -16,8 +16,9 @@ export interface TbMessage { - scriptIdMSB: string; - scriptIdLSB: string; + scriptIdMSB: string; // deprecated + scriptIdLSB: string; // deprecated + scriptHash: string; } export interface RemoteJsRequest { diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.ts b/msa/js-executor/api/jsInvokeMessageProcessor.ts index f1b60b6e07..668cd61f50 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.ts +++ b/msa/js-executor/api/jsInvokeMessageProcessor.ts @@ -18,7 +18,7 @@ import config from 'config'; import { _logger } from '../config/logger'; import { JsExecutor, TbScript } from './jsExecutor'; import { performance } from 'perf_hooks'; -import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits } from './utils'; +import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits, isNotUUID } from './utils'; import { IQueue } from '../queue/queue.models'; import { JsCompileRequest, @@ -36,6 +36,7 @@ import Long from 'long'; const COMPILATION_ERROR = 0; const RUNTIME_ERROR = 1; const TIMEOUT_ERROR = 2; +const NOT_FOUND_ERROR = 3; const statFrequency = Number(config.get('script.stat_print_frequency')); const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency')); @@ -129,7 +130,12 @@ export class JsInvokeMessageProcessor { processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) { const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest); this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); - + if (this.scriptMap.has(scriptId)) { + const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true); + this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId); + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); + return; + } this.executor.compileScript(compileRequest.scriptBody).then( (script) => { this.cacheScript(scriptId, script); @@ -170,7 +176,7 @@ export class JsInvokeMessageProcessor { this.logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } else { - let err = { + const err = { name: 'Error', message: 'script invocation result exceeds maximum allowed size of ' + maxResultSize + ' symbols' } @@ -193,8 +199,12 @@ export class JsInvokeMessageProcessor { ) }, (err: any) => { - const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, COMPILATION_ERROR, err); - this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); + let errorCode = COMPILATION_ERROR; + if (err?.name === 'script body not found') { + errorCode = NOT_FOUND_ERROR; + } + const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err); + this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); } ); @@ -222,7 +232,7 @@ export class JsInvokeMessageProcessor { const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); - this.producer.send(responseTopic, scriptId, rawResponse, headers).then( + this.producer.send(responseTopic, requestId, rawResponse, headers).then( () => { this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); }, @@ -242,7 +252,7 @@ export class JsInvokeMessageProcessor { if (script) { self.incrementUseScriptId(scriptId); resolve(script); - } else { + } else if (scriptBody) { const startTime = performance.now(); self.executor.compileScript(scriptBody).then( (compiledScript) => { @@ -255,6 +265,12 @@ export class JsInvokeMessageProcessor { reject(err); } ); + } else { + const err = { + name: 'script body not found', + message: '' + } + reject(err); } }); } @@ -285,14 +301,26 @@ export class JsInvokeMessageProcessor { } private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse { - const scriptIdBits = UUIDToBits(scriptId); - return { - errorCode: errorCode, - success: success, - errorDetails: parseJsErrorDetails(err), - scriptIdMSB: scriptIdBits[0], - scriptIdLSB: scriptIdBits[1] - }; + if (isNotUUID(scriptId)) { + return { + errorCode: errorCode, + success: success, + errorDetails: parseJsErrorDetails(err), + scriptIdMSB: "0", + scriptIdLSB: "0", + scriptHash: scriptId + }; + } else { // this is for backward compatibility (to be able to work with tb-node of previous version) - todo: remove in the next release + let scriptIdBits = UUIDToBits(scriptId); + return { + errorCode: errorCode, + success: success, + errorDetails: parseJsErrorDetails(err), + scriptIdMSB: scriptIdBits[0], + scriptIdLSB: scriptIdBits[1], + scriptHash: "" + }; + } } private static createInvokeResponse(result: string, success: boolean, errorCode?: number, err?: any): JsInvokeResponse { @@ -305,16 +333,26 @@ export class JsInvokeMessageProcessor { } private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse { - const scriptIdBits = UUIDToBits(scriptId); - return { - success: success, - scriptIdMSB: scriptIdBits[0], - scriptIdLSB: scriptIdBits[1] - }; + if (isNotUUID(scriptId)) { + return { + success: success, + scriptIdMSB: "0", + scriptIdLSB: "0", + scriptHash: scriptId, + }; + } else { // todo: remove in the next release + let scriptIdBits = UUIDToBits(scriptId); + return { + success: success, + scriptIdMSB: scriptIdBits[0], + scriptIdLSB: scriptIdBits[1], + scriptHash: "" + } + } } private static getScriptId(request: TbMessage): string { - return toUUIDString(request.scriptIdMSB, request.scriptIdLSB); + return request.scriptHash ? request.scriptHash : toUUIDString(request.scriptIdMSB, request.scriptIdLSB); } private incrementUseScriptId(scriptId: string) { diff --git a/msa/js-executor/api/utils.ts b/msa/js-executor/api/utils.ts index 361025f806..58fec28b0b 100644 --- a/msa/js-executor/api/utils.ts +++ b/msa/js-executor/api/utils.ts @@ -58,3 +58,7 @@ export function parseJsErrorDetails(err: any): string | undefined { } return details; } + +export function isNotUUID(candidate: string) { + return candidate.length != 36 || !candidate.includes('-'); +} diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 259d285cf2..31de1ae73f 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -123,10 +123,10 @@ export class AwsSqsTemplate implements IQueue { this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { let msgBody = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 51fa6e291b..7c34d99889 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -149,12 +149,11 @@ export class KafkaTemplate implements IQueue { }); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { - this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { const message = { topic: responseTopic, messages: [{ - key: scriptId, + key: msgKey, value: rawResponse, headers: headers.data }] diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index eff35017ba..9e8ee52b8b 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -80,7 +80,7 @@ export class PubSubTemplate implements IQueue { subscription.on('message', messageHandler); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) { await this.createTopic(this.requestTopic); await this.createSubscription(this.requestTopic); @@ -88,7 +88,7 @@ export class PubSubTemplate implements IQueue { let data = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index a86dc8fd1d..36932a5ee5 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -17,6 +17,6 @@ export interface IQueue { name: string; init(): Promise; - send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; + send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise; destroy(): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index ccd3cef54b..f4fe51a0ae 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -65,7 +65,7 @@ export class RabbitMqTemplate implements IQueue { }) } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!this.topics.includes(responseTopic)) { await this.createQueue(responseTopic); @@ -74,7 +74,7 @@ export class RabbitMqTemplate implements IQueue { let data = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index 76d87e8068..d2abacd289 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -82,7 +82,7 @@ export class ServiceBusTemplate implements IQueue { this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!this.queues.includes(this.requestTopic)) { await this.createQueueIfNotExist(this.requestTopic); this.queues.push(this.requestTopic); @@ -96,7 +96,7 @@ export class ServiceBusTemplate implements IQueue { } let data = { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers };