diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java index 0af1f4846b..cc46f0a74f 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java @@ -15,12 +15,14 @@ */ package org.thingsboard.server.service.script; +import com.google.common.hash.Hashing; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.util.Pair; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.id.CustomerId; @@ -41,13 +43,15 @@ import static java.lang.String.format; * Created by ashvayka on 26.09.18. */ @Slf4j +@SuppressWarnings("UnstableApiUsage") public abstract class AbstractJsInvokeService implements JsInvokeService { private final TbApiUsageStateService apiUsageStateService; private final TbApiUsageClient apiUsageClient; protected ScheduledExecutorService timeoutExecutorService; - protected Map scriptIdToNameMap = new ConcurrentHashMap<>(); - protected Map disabledFunctions = new ConcurrentHashMap<>(); + + protected final Map> scriptIdToNameAndHashMap = new ConcurrentHashMap<>(); + protected final Map disabledFunctions = new ConcurrentHashMap<>(); @Getter @Value("${js.max_total_args_size:100000}") @@ -83,27 +87,34 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { return error(format("Script body exceeds maximum allowed size of %s symbols", getMaxScriptBodySize())); } UUID scriptId = UUID.randomUUID(); - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); + String scriptHash = hash(tenantId, scriptBody); + String functionName = constructFunctionName(scriptId, scriptHash); String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); - return doEval(scriptId, functionName, jsScript); + return doEval(scriptId, scriptHash, functionName, jsScript); } else { return error("JS Execution is disabled due to API limits!"); } } + protected String constructFunctionName(UUID scriptId, String scriptHash) { + return "invokeInternal_" + scriptId.toString().replace('-', '_'); + } + @Override public ListenableFuture invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) { if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) { - String functionName = scriptIdToNameMap.get(scriptId); - if (functionName == null) { + Pair nameAndHash = scriptIdToNameAndHashMap.get(scriptId); + if (nameAndHash == null) { return error("No compiled script found for scriptId: [" + scriptId + "]!"); } + String functionName = nameAndHash.getFirst(); + String scriptHash = nameAndHash.getSecond(); if (!isDisabled(scriptId)) { if (argsSizeExceeded(args)) { return scriptExecutionError(scriptId, format("Script input arguments exceed maximum allowed total args size of %s symbols", getMaxTotalArgsSize())); } apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1); - return Futures.transformAsync(doInvokeFunction(scriptId, functionName, args), output -> { + return Futures.transformAsync(doInvokeFunction(scriptId, scriptHash, functionName, args), output -> { String result = output.toString(); if (resultSizeExceeded(result)) { return scriptExecutionError(scriptId, format("Script invocation result exceeds maximum allowed size of %s symbols", getMaxResultSize())); @@ -123,12 +134,12 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { @Override public ListenableFuture release(UUID scriptId) { - String functionName = scriptIdToNameMap.get(scriptId); - if (functionName != null) { + Pair nameAndHash = scriptIdToNameAndHashMap.get(scriptId); + if (nameAndHash != null) { try { - scriptIdToNameMap.remove(scriptId); + scriptIdToNameAndHashMap.remove(scriptId); disabledFunctions.remove(scriptId); - doRelease(scriptId, functionName); + doRelease(scriptId, nameAndHash.getSecond(), nameAndHash.getFirst()); } catch (Exception e) { return Futures.immediateFailedFuture(e); } @@ -136,16 +147,24 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { return Futures.immediateFuture(null); } - protected abstract ListenableFuture doEval(UUID scriptId, String functionName, String scriptBody); + protected abstract ListenableFuture doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody); - protected abstract ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args); + protected abstract ListenableFuture doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args); - protected abstract void doRelease(UUID scriptId, String functionName) throws Exception; + protected abstract void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception; protected abstract int getMaxErrors(); protected abstract long getMaxBlacklistDuration(); + protected String hash(TenantId tenantId, String scriptBody) { + return Hashing.murmur3_128().newHasher() + .putLong(tenantId.getId().getMostSignificantBits()) + .putLong(tenantId.getId().getLeastSignificantBits()) + .putUnencodedChars(scriptBody) + .hash().toString(); + } + protected void onScriptExecutionError(UUID scriptId, Throwable t, String scriptBody) { DisableListInfo disableListInfo = disabledFunctions.computeIfAbsent(scriptId, key -> new DisableListInfo()); log.warn("Script has exception and will increment counter {} on disabledFunctions for id {}, exception {}, cause {}, scriptBody {}", diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java index d731ef1536..4b19bf9020 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java @@ -24,6 +24,7 @@ import delight.nashornsandbox.NashornSandboxes; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.util.Pair; import org.springframework.scheduling.annotation.Scheduled; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; @@ -38,7 +39,6 @@ import javax.script.ScriptException; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -121,7 +121,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer protected abstract long getMaxCpuTime(); @Override - protected ListenableFuture doEval(UUID scriptId, String functionName, String jsScript) { + protected ListenableFuture doEval(UUID scriptId, String scriptHash, String functionName, String jsScript) { jsPushedMsgs.incrementAndGet(); ListenableFuture result = jsExecutor.executeAsync(() -> { try { @@ -135,7 +135,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer } finally { evalLock.unlock(); } - scriptIdToNameMap.put(scriptId, functionName); + scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash)); return scriptId; } catch (Exception e) { log.debug("Failed to compile JS script: {}", e.getMessage(), e); @@ -150,7 +150,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer } @Override - protected ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args) { + protected ListenableFuture doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) { jsPushedMsgs.incrementAndGet(); ListenableFuture result = jsExecutor.executeAsync(() -> { try { @@ -174,7 +174,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer return result; } - protected void doRelease(UUID scriptId, String functionName) throws ScriptException { + protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws ScriptException { if (useJsSandbox()) { sandbox.eval(functionName + " = undefined;"); } else { diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index fb83783e7e..328fc5ebb4 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.data.util.Pair; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.StopWatch; @@ -42,12 +43,13 @@ import javax.annotation.PreDestroy; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')") @@ -103,7 +105,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @Autowired protected TbQueueRequestTemplate, TbProtoQueueMsg> requestTemplate; - protected Map scriptIdToBodysMap = new ConcurrentHashMap<>(); + protected final Map scriptHashToBodysMap = new ConcurrentHashMap<>(); + private final Lock scriptsLock = new ReentrantLock(); @PostConstruct public void init() { @@ -120,10 +123,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } @Override - protected ListenableFuture doEval(UUID scriptId, String functionName, String scriptBody) { + protected ListenableFuture doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody) { JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) + .setScriptHash(scriptHash) .setFunctionName(functionName) .setScriptBody(scriptBody).build(); @@ -131,35 +133,46 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setCompileRequest(jsRequest) .build(); - log.trace("Post compile request for scriptId [{}]", scriptId); - ListenableFuture> future = sendRequest(jsRequestWrapper, maxEvalRequestsTimeout, queueEvalMsgs); + log.trace("Post compile request for scriptId [{}] (hash: {})", scriptId, scriptHash); + ListenableFuture> future = sendJsRequest(UUID.randomUUID(), jsRequestWrapper, maxEvalRequestsTimeout, queueEvalMsgs); return Futures.transform(future, response -> { JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse(); - UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); if (compilationResult.getSuccess()) { - scriptIdToNameMap.put(scriptId, functionName); - scriptIdToBodysMap.put(scriptId, scriptBody); - return compiledScriptId; + scriptsLock.lock(); + try { + scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash)); + scriptHashToBodysMap.put(scriptHash, scriptBody); + } finally { + scriptsLock.unlock(); + } + return scriptId; } else { - log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); + log.debug("[{}] (hash: {}) Failed to compile script due to [{}]: {}", scriptId, compilationResult.getScriptHash(), + compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); throw new RuntimeException(compilationResult.getErrorDetails()); } }, callbackExecutor); } @Override - protected ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args) { - log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptId, maxRequestsTimeout); - final String scriptBody = scriptIdToBodysMap.get(scriptId); + protected String constructFunctionName(UUID scriptId, String scriptHash) { + return "invokeInternal_" + scriptHash; + } + + @Override + protected ListenableFuture doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) { + log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptHash, maxRequestsTimeout); + String scriptBody = scriptHashToBodysMap.get(scriptHash); if (scriptBody == null) { - return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!")); + return Futures.immediateFailedFuture(new RuntimeException("No script body found for script hash [" + scriptHash + "] (script id: [" + scriptId + "])")); } - JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(scriptId, functionName, args, false, null); + JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(scriptHash, functionName, args, false, null); StopWatch stopWatch = new StopWatch(); stopWatch.start(); - ListenableFuture> future = sendRequest(jsRequestWrapper, maxRequestsTimeout, queueInvokeMsgs); + UUID requestKey = UUID.randomUUID(); + ListenableFuture> future = sendJsRequest(requestKey, jsRequestWrapper, maxRequestsTimeout, queueInvokeMsgs); return Futures.transformAsync(future, response -> { stopWatch.stop(); log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey()); @@ -167,12 +180,14 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { if (invokeResult.getSuccess()) { return Futures.immediateFuture(invokeResult.getResult()); } else { - return handleInvokeError(scriptId, invokeResult.getErrorCode(), invokeResult.getErrorDetails(), functionName, args, scriptBody); + return handleInvokeError(requestKey, scriptId, scriptHash, invokeResult.getErrorCode(), + invokeResult.getErrorDetails(), functionName, args, scriptBody); } }, callbackExecutor); } - private ListenableFuture handleInvokeError(UUID scriptId, JsInvokeErrorCode errorCode, String errorDetails, + private ListenableFuture handleInvokeError(UUID requestKey, UUID scriptId, String scriptHash, + JsInvokeErrorCode errorCode, String errorDetails, String functionName, Object[] args, String scriptBody) { log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails); RuntimeException e = new RuntimeException(errorDetails); @@ -184,14 +199,15 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } else if (JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) { log.debug("[{}] Remote JS executor couldn't find the script", scriptId); if (scriptBody != null) { - JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(scriptId, functionName, args, true, scriptBody); + JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(scriptHash, functionName, args, true, scriptBody); log.debug("[{}] Sending invoke request again with script body", scriptId); - return Futures.transformAsync(sendJsRequest(invokeRequestWithScriptBody, maxRequestsTimeout, queueInvokeMsgs, MoreExecutors.directExecutor()), r -> { + return Futures.transformAsync(sendJsRequest(requestKey, invokeRequestWithScriptBody, maxRequestsTimeout, queueInvokeMsgs), r -> { JsInvokeProtos.JsInvokeResponse result = r.getValue().getInvokeResponse(); if (result.getSuccess()) { return Futures.immediateFuture(result.getResult()); } else { - return handleInvokeError(scriptId, result.getErrorCode(), result.getErrorDetails(), functionName, args, null); + return handleInvokeError(requestKey, scriptId, scriptHash, result.getErrorCode(), + result.getErrorDetails(), functionName, args, null); } }, MoreExecutors.directExecutor()); } @@ -200,12 +216,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { return Futures.immediateFailedFuture(e); } - private ListenableFuture> sendRequest(JsInvokeProtos.RemoteJsRequest jsRequestWrapper, long maxRequestsTimeout, AtomicInteger msgsCounter) { - return sendJsRequest(jsRequestWrapper, maxRequestsTimeout, msgsCounter, callbackExecutor); - } - - private ListenableFuture> sendJsRequest(JsInvokeProtos.RemoteJsRequest jsRequestWrapper, long maxRequestsTimeout, AtomicInteger msgsCounter, Executor callbackExecutor) { - ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); + private ListenableFuture> sendJsRequest(UUID requestKey, JsInvokeProtos.RemoteJsRequest jsRequestWrapper, + long maxRequestsTimeout, AtomicInteger msgsCounter) { + ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, jsRequestWrapper)); if (maxRequestsTimeout > 0) { future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); } @@ -227,10 +240,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { return future; } - private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(UUID scriptId, String functionName, Object[] args, boolean includeScriptBody, String scriptBody) { + private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(String scriptHash, String functionName, Object[] args, boolean includeScriptBody, String scriptBody) { JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) + .setScriptHash(scriptHash) .setFunctionName(functionName) .setTimeout((int) maxExecRequestsTimeout); if (includeScriptBody) jsRequestBuilder.setScriptBody(scriptBody); @@ -245,10 +257,12 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } @Override - protected void doRelease(UUID scriptId, String functionName) throws Exception { + protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception { + if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).anyMatch(hash -> hash.equals(scriptHash))) { + return; + } JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder() - .setScriptIdMSB(scriptId.getMostSignificantBits()) - .setScriptIdLSB(scriptId.getLeastSignificantBits()) + .setScriptHash(scriptHash) .setFunctionName(functionName).build(); JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() @@ -261,12 +275,18 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } JsInvokeProtos.RemoteJsResponse response = future.get().getValue(); - JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse(); - UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); - if (compilationResult.getSuccess()) { - scriptIdToBodysMap.remove(scriptId); + JsInvokeProtos.JsReleaseResponse releaseResponse = response.getReleaseResponse(); + if (releaseResponse.getSuccess()) { + scriptsLock.lock(); + try { + if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).noneMatch(h -> h.equals(scriptHash))) { + scriptHashToBodysMap.remove(scriptHash); + } + } finally { + scriptsLock.unlock(); + } } else { - log.debug("[{}] Failed to release script due", compiledScriptId); + log.debug("[{}] Failed to release script", scriptHash); } } diff --git a/application/src/test/java/org/thingsboard/server/service/script/JsInvokeServiceTest.java b/application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java similarity index 98% rename from application/src/test/java/org/thingsboard/server/service/script/JsInvokeServiceTest.java rename to application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java index 8f0edd71c6..039a69c01f 100644 --- a/application/src/test/java/org/thingsboard/server/service/script/JsInvokeServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/script/LocalJsInvokeServiceTest.java @@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; "js.max_result_size=50", "js.local.max_errors=2" }) -class JsInvokeServiceTest extends AbstractControllerTest { +class LocalJsInvokeServiceTest extends AbstractControllerTest { @Autowired private NashornJsInvokeService jsInvokeService; diff --git a/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java b/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java index 83741eb2be..30b9b0b2c0 100644 --- a/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/script/RemoteJsInvokeServiceTest.java @@ -21,24 +21,33 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.usagestats.TbApiUsageClient; +import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; class RemoteJsInvokeServiceTest { @@ -48,7 +57,13 @@ class RemoteJsInvokeServiceTest { @BeforeEach public void beforeEach() { - remoteJsInvokeService = new RemoteJsInvokeService(null, null); + TbApiUsageStateService apiUsageStateService = mock(TbApiUsageStateService.class); + ApiUsageState apiUsageState = mock(ApiUsageState.class); + when(apiUsageState.isJsExecEnabled()).thenReturn(true); + when(apiUsageStateService.getApiUsageState(any())).thenReturn(apiUsageState); + TbApiUsageClient apiUsageClient = mock(TbApiUsageClient.class); + + remoteJsInvokeService = new RemoteJsInvokeService(apiUsageStateService, apiUsageClient); jsRequestTemplate = mock(TbQueueRequestTemplate.class); remoteJsInvokeService.requestTemplate = jsRequestTemplate; } @@ -60,8 +75,10 @@ class RemoteJsInvokeServiceTest { @Test public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception { - UUID scriptId = UUID.randomUUID(); - remoteJsInvokeService.scriptIdToBodysMap.put(scriptId, "scriptscriptscript"); + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + reset(jsRequestTemplate); String expectedInvocationResult = "scriptInvocationResult"; doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() @@ -73,21 +90,21 @@ class RemoteJsInvokeServiceTest { .when(jsRequestTemplate).send(any()); ArgumentCaptor> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); - Object invocationResult = remoteJsInvokeService.doInvokeFunction(scriptId, "f", new Object[]{"a"}).get(); + Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get(); verify(jsRequestTemplate).send(jsRequestCaptor.capture()); JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest(); - assertThat(jsInvokeRequestMade.getScriptIdLSB()).isEqualTo(scriptId.getLeastSignificantBits()); - assertThat(jsInvokeRequestMade.getScriptIdMSB()).isEqualTo(scriptId.getMostSignificantBits()); assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty(); + assertThat(jsInvokeRequestMade.getScriptHash()).isEqualTo(getScriptHash(scriptId)); assertThat(invocationResult).isEqualTo(expectedInvocationResult); } @Test public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception { - UUID scriptId = UUID.randomUUID(); - String scriptBody = "scriptscriptscript"; - remoteJsInvokeService.scriptIdToBodysMap.put(scriptId, scriptBody); + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + reset(jsRequestTemplate); doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() @@ -111,7 +128,7 @@ class RemoteJsInvokeServiceTest { })); ArgumentCaptor> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); - Object invocationResult = remoteJsInvokeService.doInvokeFunction(scriptId, "f", new Object[]{"a"}).get(); + Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get(); verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture()); List> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues(); @@ -120,9 +137,83 @@ class RemoteJsInvokeServiceTest { assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty(); JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest(); - assertThat(secondRequestMade.getScriptBody()).isEqualTo(scriptBody); + assertThat(secondRequestMade.getScriptBody()).contains(scriptBody); + + assertThat(jsInvokeRequestsMade.stream().map(TbProtoQueueMsg::getKey).distinct().count()).as("partition keys are same") + .isOne(); assertThat(invocationResult).isEqualTo(expectedInvocationResult); } + @Test + public void whenDoingEval_thenSaveScriptByHashOfTenantIdAndScriptBody() throws Exception { + mockJsEvalResponse(); + + TenantId tenantId1 = TenantId.fromUUID(UUID.randomUUID()); + String scriptBody1 = "var msg = { temp: 42, humidity: 77 };\n" + + "var metadata = { data: 40 };\n" + + "var msgType = \"POST_TELEMETRY_REQUEST\";\n" + + "\n" + + "return { msg: msg, metadata: metadata, msgType: msgType };"; + + Set scriptHashes = new HashSet<>(); + String tenant1Script1Hash = null; + for (int i = 0; i < 3; i++) { + UUID scriptUuid = remoteJsInvokeService.eval(tenantId1, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get(); + tenant1Script1Hash = getScriptHash(scriptUuid); + scriptHashes.add(tenant1Script1Hash); + } + assertThat(scriptHashes).as("Unique scripts ids").size().isOne(); + + TenantId tenantId2 = TenantId.fromUUID(UUID.randomUUID()); + UUID scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get(); + String tenant2Script1Id = getScriptHash(scriptUuid); + assertThat(tenant2Script1Id).isNotEqualTo(tenant1Script1Hash); + + String scriptBody2 = scriptBody1 + ";;"; + scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody2).get(); + String tenant2Script2Id = getScriptHash(scriptUuid); + assertThat(tenant2Script2Id).isNotEqualTo(tenant2Script1Id); + } + + @Test + public void whenReleasingScript_thenCheckForHashUsages() throws Exception { + mockJsEvalResponse(); + String scriptBody = "return { a: 'b'};"; + UUID scriptId1 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + UUID scriptId2 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get(); + String scriptHash = getScriptHash(scriptId1); + assertThat(scriptHash).isEqualTo(getScriptHash(scriptId2)); + reset(jsRequestTemplate); + + doReturn(Futures.immediateFuture(new TbProtoQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setReleaseResponse(JsInvokeProtos.JsReleaseResponse.newBuilder() + .setSuccess(true) + .build()) + .build()))) + .when(jsRequestTemplate).send(any()); + + remoteJsInvokeService.release(scriptId1).get(); + verifyNoInteractions(jsRequestTemplate); + assertThat(remoteJsInvokeService.scriptHashToBodysMap).containsKey(scriptHash); + + remoteJsInvokeService.release(scriptId2).get(); + verify(jsRequestTemplate).send(any()); + assertThat(remoteJsInvokeService.scriptHashToBodysMap).isEmpty(); + } + + private String getScriptHash(UUID scriptUuid) { + return remoteJsInvokeService.scriptIdToNameAndHashMap.get(scriptUuid).getSecond(); + } + + private void mockJsEvalResponse() { + doAnswer(methodCall -> Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() + .setCompileResponse(JsInvokeProtos.JsCompileResponse.newBuilder() + .setSuccess(true) + .setScriptHash(methodCall.>getArgument(0).getValue().getCompileRequest().getScriptHash()) + .build()) + .build()))) + .when(jsRequestTemplate).send(argThat(jsQueueMsg -> jsQueueMsg.getValue().hasCompileRequest())); + } + } diff --git a/common/cluster-api/src/main/proto/jsinvoke.proto b/common/cluster-api/src/main/proto/jsinvoke.proto index e5bd4482b9..71912a9884 100644 --- a/common/cluster-api/src/main/proto/jsinvoke.proto +++ b/common/cluster-api/src/main/proto/jsinvoke.proto @@ -41,39 +41,34 @@ message RemoteJsResponse { } message JsCompileRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; - string functionName = 3; - string scriptBody = 4; + string scriptHash = 1; + string functionName = 2; + string scriptBody = 3; } message JsReleaseRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; - string functionName = 3; + string scriptHash = 1; + string functionName = 2; } message JsReleaseResponse { bool success = 1; - int64 scriptIdMSB = 2; - int64 scriptIdLSB = 3; + string scriptHash = 2; } message JsCompileResponse { bool success = 1; - int64 scriptIdMSB = 2; - int64 scriptIdLSB = 3; - JsInvokeErrorCode errorCode = 4; - string errorDetails = 5; + string scriptHash = 2; + JsInvokeErrorCode errorCode = 3; + string errorDetails = 4; } message JsInvokeRequest { - int64 scriptIdMSB = 1; - int64 scriptIdLSB = 2; - string functionName = 3; - string scriptBody = 4; - int32 timeout = 5; - repeated string args = 6; + string scriptHash = 1; + string functionName = 2; + string scriptBody = 3; + int32 timeout = 4; + repeated string args = 5; } message JsInvokeResponse { @@ -82,4 +77,3 @@ message JsInvokeResponse { JsInvokeErrorCode errorCode = 3; string errorDetails = 4; } - diff --git a/msa/js-executor/api/jsExecutor.models.ts b/msa/js-executor/api/jsExecutor.models.ts index db2ced52c4..ae88e132fe 100644 --- a/msa/js-executor/api/jsExecutor.models.ts +++ b/msa/js-executor/api/jsExecutor.models.ts @@ -16,8 +16,7 @@ export interface TbMessage { - scriptIdMSB: string; - scriptIdLSB: string; + scriptHash: string; } export interface RemoteJsRequest { diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.ts b/msa/js-executor/api/jsInvokeMessageProcessor.ts index c5c34febbe..bb31666ddf 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.ts +++ b/msa/js-executor/api/jsInvokeMessageProcessor.ts @@ -130,7 +130,12 @@ export class JsInvokeMessageProcessor { processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) { const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest); this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); - + if (this.scriptMap.has(scriptId)) { + const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true); + this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId); + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); + return; + } this.executor.compileScript(compileRequest.scriptBody).then( (script) => { this.cacheScript(scriptId, script); @@ -227,7 +232,7 @@ export class JsInvokeMessageProcessor { const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); - this.producer.send(responseTopic, scriptId, rawResponse, headers).then( + this.producer.send(responseTopic, requestId, rawResponse, headers).then( () => { this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); }, @@ -296,13 +301,11 @@ export class JsInvokeMessageProcessor { } private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse { - const scriptIdBits = UUIDToBits(scriptId); return { errorCode: errorCode, success: success, errorDetails: parseJsErrorDetails(err), - scriptIdMSB: scriptIdBits[0], - scriptIdLSB: scriptIdBits[1] + scriptHash: scriptId, }; } @@ -316,16 +319,14 @@ export class JsInvokeMessageProcessor { } private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse { - const scriptIdBits = UUIDToBits(scriptId); return { success: success, - scriptIdMSB: scriptIdBits[0], - scriptIdLSB: scriptIdBits[1] + scriptHash: scriptId, }; } private static getScriptId(request: TbMessage): string { - return toUUIDString(request.scriptIdMSB, request.scriptIdLSB); + return request.scriptHash; } private incrementUseScriptId(scriptId: string) { diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 259d285cf2..31de1ae73f 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -123,10 +123,10 @@ export class AwsSqsTemplate implements IQueue { this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { let msgBody = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 51fa6e291b..7c34d99889 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -149,12 +149,11 @@ export class KafkaTemplate implements IQueue { }); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { - this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { const message = { topic: responseTopic, messages: [{ - key: scriptId, + key: msgKey, value: rawResponse, headers: headers.data }] diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index eff35017ba..9e8ee52b8b 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -80,7 +80,7 @@ export class PubSubTemplate implements IQueue { subscription.on('message', messageHandler); } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) { await this.createTopic(this.requestTopic); await this.createSubscription(this.requestTopic); @@ -88,7 +88,7 @@ export class PubSubTemplate implements IQueue { let data = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index a86dc8fd1d..36932a5ee5 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -17,6 +17,6 @@ export interface IQueue { name: string; init(): Promise; - send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; + send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise; destroy(): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index ccd3cef54b..f4fe51a0ae 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -65,7 +65,7 @@ export class RabbitMqTemplate implements IQueue { }) } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!this.topics.includes(responseTopic)) { await this.createQueue(responseTopic); @@ -74,7 +74,7 @@ export class RabbitMqTemplate implements IQueue { let data = JSON.stringify( { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers }); diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index 76d87e8068..d2abacd289 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -82,7 +82,7 @@ export class ServiceBusTemplate implements IQueue { this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) } - async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { + async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise { if (!this.queues.includes(this.requestTopic)) { await this.createQueueIfNotExist(this.requestTopic); this.queues.push(this.requestTopic); @@ -96,7 +96,7 @@ export class ServiceBusTemplate implements IQueue { } let data = { - key: scriptId, + key: msgKey, data: [...rawResponse], headers: headers };