Merge pull request #7403 from ViacheslavKlimov/feature/remote-js-executor-improvements
[3.4.2] Improvements for remote JS executor
This commit is contained in:
		
						commit
						9d23c91137
					
				@ -15,12 +15,14 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.script;
 | 
			
		||||
 | 
			
		||||
import com.google.common.hash.Hashing;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.data.util.Pair;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
@ -41,13 +43,15 @@ import static java.lang.String.format;
 | 
			
		||||
 * Created by ashvayka on 26.09.18.
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
@SuppressWarnings("UnstableApiUsage")
 | 
			
		||||
public abstract class AbstractJsInvokeService implements JsInvokeService {
 | 
			
		||||
 | 
			
		||||
    private final TbApiUsageStateService apiUsageStateService;
 | 
			
		||||
    private final TbApiUsageClient apiUsageClient;
 | 
			
		||||
    protected ScheduledExecutorService timeoutExecutorService;
 | 
			
		||||
    protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
 | 
			
		||||
    protected Map<UUID, DisableListInfo> disabledFunctions = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    protected final Map<UUID, Pair<String, String>> scriptIdToNameAndHashMap = new ConcurrentHashMap<>();
 | 
			
		||||
    protected final Map<UUID, DisableListInfo> disabledFunctions = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    @Value("${js.max_total_args_size:100000}")
 | 
			
		||||
@ -83,27 +87,34 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
 | 
			
		||||
                return error(format("Script body exceeds maximum allowed size of %s symbols", getMaxScriptBodySize()));
 | 
			
		||||
            }
 | 
			
		||||
            UUID scriptId = UUID.randomUUID();
 | 
			
		||||
            String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
 | 
			
		||||
            String scriptHash = hash(tenantId, scriptBody);
 | 
			
		||||
            String functionName = constructFunctionName(scriptId, scriptHash);
 | 
			
		||||
            String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
 | 
			
		||||
            return doEval(scriptId, functionName, jsScript);
 | 
			
		||||
            return doEval(scriptId, scriptHash, functionName, jsScript);
 | 
			
		||||
        } else {
 | 
			
		||||
            return error("JS Execution is disabled due to API limits!");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected String constructFunctionName(UUID scriptId, String scriptHash) {
 | 
			
		||||
        return "invokeInternal_" + scriptId.toString().replace('-', '_');
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<String> invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) {
 | 
			
		||||
        if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) {
 | 
			
		||||
            String functionName = scriptIdToNameMap.get(scriptId);
 | 
			
		||||
            if (functionName == null) {
 | 
			
		||||
            Pair<String, String> nameAndHash = scriptIdToNameAndHashMap.get(scriptId);
 | 
			
		||||
            if (nameAndHash == null) {
 | 
			
		||||
                return error("No compiled script found for scriptId: [" + scriptId + "]!");
 | 
			
		||||
            }
 | 
			
		||||
            String functionName = nameAndHash.getFirst();
 | 
			
		||||
            String scriptHash = nameAndHash.getSecond();
 | 
			
		||||
            if (!isDisabled(scriptId)) {
 | 
			
		||||
                if (argsSizeExceeded(args)) {
 | 
			
		||||
                    return scriptExecutionError(scriptId, format("Script input arguments exceed maximum allowed total args size of %s symbols", getMaxTotalArgsSize()));
 | 
			
		||||
                }
 | 
			
		||||
                apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1);
 | 
			
		||||
                return Futures.transformAsync(doInvokeFunction(scriptId, functionName, args), output -> {
 | 
			
		||||
                return Futures.transformAsync(doInvokeFunction(scriptId, scriptHash, functionName, args), output -> {
 | 
			
		||||
                    String result = output.toString();
 | 
			
		||||
                    if (resultSizeExceeded(result)) {
 | 
			
		||||
                        return scriptExecutionError(scriptId, format("Script invocation result exceeds maximum allowed size of %s symbols", getMaxResultSize()));
 | 
			
		||||
@ -123,12 +134,12 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> release(UUID scriptId) {
 | 
			
		||||
        String functionName = scriptIdToNameMap.get(scriptId);
 | 
			
		||||
        if (functionName != null) {
 | 
			
		||||
        Pair<String, String> nameAndHash = scriptIdToNameAndHashMap.get(scriptId);
 | 
			
		||||
        if (nameAndHash != null) {
 | 
			
		||||
            try {
 | 
			
		||||
                scriptIdToNameMap.remove(scriptId);
 | 
			
		||||
                scriptIdToNameAndHashMap.remove(scriptId);
 | 
			
		||||
                disabledFunctions.remove(scriptId);
 | 
			
		||||
                doRelease(scriptId, functionName);
 | 
			
		||||
                doRelease(scriptId, nameAndHash.getSecond(), nameAndHash.getFirst());
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                return Futures.immediateFailedFuture(e);
 | 
			
		||||
            }
 | 
			
		||||
@ -136,16 +147,24 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
 | 
			
		||||
        return Futures.immediateFuture(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody);
 | 
			
		||||
    protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody);
 | 
			
		||||
 | 
			
		||||
    protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args);
 | 
			
		||||
    protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args);
 | 
			
		||||
 | 
			
		||||
    protected abstract void doRelease(UUID scriptId, String functionName) throws Exception;
 | 
			
		||||
    protected abstract void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception;
 | 
			
		||||
 | 
			
		||||
    protected abstract int getMaxErrors();
 | 
			
		||||
 | 
			
		||||
    protected abstract long getMaxBlacklistDuration();
 | 
			
		||||
 | 
			
		||||
    protected String hash(TenantId tenantId, String scriptBody) {
 | 
			
		||||
        return Hashing.murmur3_128().newHasher()
 | 
			
		||||
                .putLong(tenantId.getId().getMostSignificantBits())
 | 
			
		||||
                .putLong(tenantId.getId().getLeastSignificantBits())
 | 
			
		||||
                .putUnencodedChars(scriptBody)
 | 
			
		||||
                .hash().toString();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void onScriptExecutionError(UUID scriptId, Throwable t, String scriptBody) {
 | 
			
		||||
        DisableListInfo disableListInfo = disabledFunctions.computeIfAbsent(scriptId, key -> new DisableListInfo());
 | 
			
		||||
        log.warn("Script has exception and will increment counter {} on disabledFunctions for id {}, exception {}, cause {}, scriptBody {}",
 | 
			
		||||
 | 
			
		||||
@ -24,6 +24,7 @@ import delight.nashornsandbox.NashornSandboxes;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.data.util.Pair;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
 | 
			
		||||
@ -38,7 +39,6 @@ import javax.script.ScriptException;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
@ -121,7 +121,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
 | 
			
		||||
    protected abstract long getMaxCpuTime();
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String jsScript) {
 | 
			
		||||
    protected ListenableFuture<UUID> doEval(UUID scriptId, String scriptHash, String functionName, String jsScript) {
 | 
			
		||||
        jsPushedMsgs.incrementAndGet();
 | 
			
		||||
        ListenableFuture<UUID> result = jsExecutor.executeAsync(() -> {
 | 
			
		||||
            try {
 | 
			
		||||
@ -135,7 +135,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
 | 
			
		||||
                } finally {
 | 
			
		||||
                    evalLock.unlock();
 | 
			
		||||
                }
 | 
			
		||||
                scriptIdToNameMap.put(scriptId, functionName);
 | 
			
		||||
                scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash));
 | 
			
		||||
                return scriptId;
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.debug("Failed to compile JS script: {}", e.getMessage(), e);
 | 
			
		||||
@ -150,7 +150,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
 | 
			
		||||
    protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) {
 | 
			
		||||
        jsPushedMsgs.incrementAndGet();
 | 
			
		||||
        ListenableFuture<Object> result = jsExecutor.executeAsync(() -> {
 | 
			
		||||
            try {
 | 
			
		||||
@ -174,7 +174,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void doRelease(UUID scriptId, String functionName) throws ScriptException {
 | 
			
		||||
    protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws ScriptException {
 | 
			
		||||
        if (useJsSandbox()) {
 | 
			
		||||
            sandbox.eval(functionName + " = undefined;");
 | 
			
		||||
        } else {
 | 
			
		||||
 | 
			
		||||
@ -18,16 +18,19 @@ package org.thingsboard.server.service.script;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.data.util.Pair;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.springframework.util.StopWatch;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
 | 
			
		||||
import org.thingsboard.server.gen.js.JsInvokeProtos.JsInvokeErrorCode;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
@ -45,6 +48,8 @@ import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.TimeoutException;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
import java.util.concurrent.locks.Lock;
 | 
			
		||||
import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')")
 | 
			
		||||
@ -98,9 +103,10 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
 | 
			
		||||
    protected TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
 | 
			
		||||
 | 
			
		||||
    private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
 | 
			
		||||
    protected final Map<String, String> scriptHashToBodysMap = new ConcurrentHashMap<>();
 | 
			
		||||
    private final Lock scriptsLock = new ReentrantLock();
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
@ -117,10 +123,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody) {
 | 
			
		||||
    protected ListenableFuture<UUID> doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody) {
 | 
			
		||||
        JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder()
 | 
			
		||||
                .setScriptIdMSB(scriptId.getMostSignificantBits())
 | 
			
		||||
                .setScriptIdLSB(scriptId.getLeastSignificantBits())
 | 
			
		||||
                .setScriptHash(scriptHash)
 | 
			
		||||
                .setFunctionName(functionName)
 | 
			
		||||
                .setScriptBody(scriptBody).build();
 | 
			
		||||
 | 
			
		||||
@ -128,16 +133,100 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
 | 
			
		||||
                .setCompileRequest(jsRequest)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        log.trace("Post compile request for scriptId [{}]", scriptId);
 | 
			
		||||
        ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
 | 
			
		||||
        if (maxEvalRequestsTimeout > 0) {
 | 
			
		||||
            future = Futures.withTimeout(future, maxEvalRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
 | 
			
		||||
        log.trace("Post compile request for scriptId [{}] (hash: {})", scriptId, scriptHash);
 | 
			
		||||
        ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendJsRequest(UUID.randomUUID(), jsRequestWrapper, maxEvalRequestsTimeout, queueEvalMsgs);
 | 
			
		||||
        return Futures.transform(future, response -> {
 | 
			
		||||
            JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse();
 | 
			
		||||
            if (compilationResult.getSuccess()) {
 | 
			
		||||
                scriptsLock.lock();
 | 
			
		||||
                try {
 | 
			
		||||
                    scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash));
 | 
			
		||||
                    scriptHashToBodysMap.put(scriptHash, scriptBody);
 | 
			
		||||
                } finally {
 | 
			
		||||
                    scriptsLock.unlock();
 | 
			
		||||
                }
 | 
			
		||||
                return scriptId;
 | 
			
		||||
            } else {
 | 
			
		||||
                log.debug("[{}] (hash: {}) Failed to compile script due to [{}]: {}", scriptId, compilationResult.getScriptHash(),
 | 
			
		||||
                        compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
 | 
			
		||||
                throw new RuntimeException(compilationResult.getErrorDetails());
 | 
			
		||||
            }
 | 
			
		||||
        }, callbackExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected String constructFunctionName(UUID scriptId, String scriptHash) {
 | 
			
		||||
        return "invokeInternal_" + scriptHash;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) {
 | 
			
		||||
        log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptHash, maxRequestsTimeout);
 | 
			
		||||
        String scriptBody = scriptHashToBodysMap.get(scriptHash);
 | 
			
		||||
        if (scriptBody == null) {
 | 
			
		||||
            return Futures.immediateFailedFuture(new RuntimeException("No script body found for script hash [" + scriptHash + "] (script id: [" + scriptId + "])"));
 | 
			
		||||
        }
 | 
			
		||||
        JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(scriptHash, functionName, args, false, null);
 | 
			
		||||
 | 
			
		||||
        StopWatch stopWatch = new StopWatch();
 | 
			
		||||
        stopWatch.start();
 | 
			
		||||
 | 
			
		||||
        UUID requestKey = UUID.randomUUID();
 | 
			
		||||
        ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendJsRequest(requestKey, jsRequestWrapper, maxRequestsTimeout, queueInvokeMsgs);
 | 
			
		||||
        return Futures.transformAsync(future, response -> {
 | 
			
		||||
            stopWatch.stop();
 | 
			
		||||
            log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
 | 
			
		||||
            JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
 | 
			
		||||
            if (invokeResult.getSuccess()) {
 | 
			
		||||
                return Futures.immediateFuture(invokeResult.getResult());
 | 
			
		||||
            } else {
 | 
			
		||||
                return handleInvokeError(requestKey, scriptId, scriptHash, invokeResult.getErrorCode(),
 | 
			
		||||
                        invokeResult.getErrorDetails(), functionName, args, scriptBody);
 | 
			
		||||
            }
 | 
			
		||||
        }, callbackExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Object> handleInvokeError(UUID requestKey, UUID scriptId, String scriptHash,
 | 
			
		||||
                                                       JsInvokeErrorCode errorCode, String errorDetails,
 | 
			
		||||
                                                       String functionName, Object[] args, String scriptBody) {
 | 
			
		||||
        log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails);
 | 
			
		||||
        RuntimeException e = new RuntimeException(errorDetails);
 | 
			
		||||
        if (JsInvokeErrorCode.TIMEOUT_ERROR.equals(errorCode)) {
 | 
			
		||||
            onScriptExecutionError(scriptId, e, scriptBody);
 | 
			
		||||
            queueTimeoutMsgs.incrementAndGet();
 | 
			
		||||
        } else if (JsInvokeErrorCode.COMPILATION_ERROR.equals(errorCode)) {
 | 
			
		||||
            onScriptExecutionError(scriptId, e, scriptBody);
 | 
			
		||||
        } else if (JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) {
 | 
			
		||||
            log.debug("[{}] Remote JS executor couldn't find the script", scriptId);
 | 
			
		||||
            if (scriptBody != null) {
 | 
			
		||||
                JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(scriptHash, functionName, args, true, scriptBody);
 | 
			
		||||
                log.debug("[{}] Sending invoke request again with script body", scriptId);
 | 
			
		||||
                return Futures.transformAsync(sendJsRequest(requestKey, invokeRequestWithScriptBody, maxRequestsTimeout, queueInvokeMsgs), r -> {
 | 
			
		||||
                    JsInvokeProtos.JsInvokeResponse result = r.getValue().getInvokeResponse();
 | 
			
		||||
                    if (result.getSuccess()) {
 | 
			
		||||
                        return Futures.immediateFuture(result.getResult());
 | 
			
		||||
                    } else {
 | 
			
		||||
                        return handleInvokeError(requestKey, scriptId, scriptHash, result.getErrorCode(),
 | 
			
		||||
                                result.getErrorDetails(), functionName, args, null);
 | 
			
		||||
                    }
 | 
			
		||||
                }, MoreExecutors.directExecutor());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        queueFailedMsgs.incrementAndGet();
 | 
			
		||||
        return Futures.immediateFailedFuture(e);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> sendJsRequest(UUID requestKey, JsInvokeProtos.RemoteJsRequest jsRequestWrapper,
 | 
			
		||||
                                                                                             long maxRequestsTimeout, AtomicInteger msgsCounter) {
 | 
			
		||||
        ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, jsRequestWrapper));
 | 
			
		||||
        if (maxRequestsTimeout > 0) {
 | 
			
		||||
            future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
 | 
			
		||||
        }
 | 
			
		||||
        queuePushedMsgs.incrementAndGet();
 | 
			
		||||
        Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
 | 
			
		||||
        Futures.addCallback(future, new FutureCallback<>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse> result) {
 | 
			
		||||
                queueEvalMsgs.incrementAndGet();
 | 
			
		||||
                msgsCounter.incrementAndGet();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
@ -148,34 +237,15 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
 | 
			
		||||
                queueFailedMsgs.incrementAndGet();
 | 
			
		||||
            }
 | 
			
		||||
        }, callbackExecutor);
 | 
			
		||||
        return Futures.transform(future, response -> {
 | 
			
		||||
            JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse();
 | 
			
		||||
            UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
 | 
			
		||||
            if (compilationResult.getSuccess()) {
 | 
			
		||||
                scriptIdToNameMap.put(scriptId, functionName);
 | 
			
		||||
                scriptIdToBodysMap.put(scriptId, scriptBody);
 | 
			
		||||
                return compiledScriptId;
 | 
			
		||||
            } else {
 | 
			
		||||
                log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
 | 
			
		||||
                throw new RuntimeException(compilationResult.getErrorDetails());
 | 
			
		||||
            }
 | 
			
		||||
        }, callbackExecutor);
 | 
			
		||||
        return future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
 | 
			
		||||
        log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptId, maxRequestsTimeout);
 | 
			
		||||
        final String scriptBody = scriptIdToBodysMap.get(scriptId);
 | 
			
		||||
        if (scriptBody == null) {
 | 
			
		||||
            return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
 | 
			
		||||
        }
 | 
			
		||||
    private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(String scriptHash, String functionName, Object[] args, boolean includeScriptBody, String scriptBody) {
 | 
			
		||||
        JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
 | 
			
		||||
                .setScriptIdMSB(scriptId.getMostSignificantBits())
 | 
			
		||||
                .setScriptIdLSB(scriptId.getLeastSignificantBits())
 | 
			
		||||
                .setScriptHash(scriptHash)
 | 
			
		||||
                .setFunctionName(functionName)
 | 
			
		||||
                .setTimeout((int) maxExecRequestsTimeout)
 | 
			
		||||
                .setScriptBody(scriptBody);
 | 
			
		||||
 | 
			
		||||
                .setTimeout((int) maxExecRequestsTimeout);
 | 
			
		||||
        if (includeScriptBody) jsRequestBuilder.setScriptBody(scriptBody);
 | 
			
		||||
        for (Object arg : args) {
 | 
			
		||||
            jsRequestBuilder.addArgs(arg.toString());
 | 
			
		||||
        }
 | 
			
		||||
@ -183,55 +253,16 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
 | 
			
		||||
        JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
 | 
			
		||||
                .setInvokeRequest(jsRequestBuilder.build())
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        StopWatch stopWatch = new StopWatch();
 | 
			
		||||
        stopWatch.start();
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
 | 
			
		||||
        if (maxRequestsTimeout > 0) {
 | 
			
		||||
            future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
 | 
			
		||||
        }
 | 
			
		||||
        queuePushedMsgs.incrementAndGet();
 | 
			
		||||
        Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse> result) {
 | 
			
		||||
                queueInvokeMsgs.incrementAndGet();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
 | 
			
		||||
                    queueTimeoutMsgs.incrementAndGet();
 | 
			
		||||
                }
 | 
			
		||||
                queueFailedMsgs.incrementAndGet();
 | 
			
		||||
            }
 | 
			
		||||
        }, callbackExecutor);
 | 
			
		||||
        return Futures.transform(future, response -> {
 | 
			
		||||
            stopWatch.stop();
 | 
			
		||||
            log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
 | 
			
		||||
            JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
 | 
			
		||||
            if (invokeResult.getSuccess()) {
 | 
			
		||||
                return invokeResult.getResult();
 | 
			
		||||
            } else {
 | 
			
		||||
                final RuntimeException e = new RuntimeException(invokeResult.getErrorDetails());
 | 
			
		||||
                if (JsInvokeProtos.JsInvokeErrorCode.TIMEOUT_ERROR.equals(invokeResult.getErrorCode())) {
 | 
			
		||||
                    onScriptExecutionError(scriptId, e, scriptBody);
 | 
			
		||||
                    queueTimeoutMsgs.incrementAndGet();
 | 
			
		||||
                } else if (JsInvokeProtos.JsInvokeErrorCode.COMPILATION_ERROR.equals(invokeResult.getErrorCode())) {
 | 
			
		||||
                    onScriptExecutionError(scriptId, e, scriptBody);
 | 
			
		||||
                }
 | 
			
		||||
                queueFailedMsgs.incrementAndGet();
 | 
			
		||||
                log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
 | 
			
		||||
                throw e;
 | 
			
		||||
            }
 | 
			
		||||
        }, callbackExecutor);
 | 
			
		||||
        return jsRequestWrapper;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void doRelease(UUID scriptId, String functionName) throws Exception {
 | 
			
		||||
    protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception {
 | 
			
		||||
        if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).anyMatch(hash -> hash.equals(scriptHash))) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder()
 | 
			
		||||
                .setScriptIdMSB(scriptId.getMostSignificantBits())
 | 
			
		||||
                .setScriptIdLSB(scriptId.getLeastSignificantBits())
 | 
			
		||||
                .setScriptHash(scriptHash)
 | 
			
		||||
                .setFunctionName(functionName).build();
 | 
			
		||||
 | 
			
		||||
        JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
 | 
			
		||||
@ -244,12 +275,18 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
 | 
			
		||||
        }
 | 
			
		||||
        JsInvokeProtos.RemoteJsResponse response = future.get().getValue();
 | 
			
		||||
 | 
			
		||||
        JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse();
 | 
			
		||||
        UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
 | 
			
		||||
        if (compilationResult.getSuccess()) {
 | 
			
		||||
            scriptIdToBodysMap.remove(scriptId);
 | 
			
		||||
        JsInvokeProtos.JsReleaseResponse releaseResponse = response.getReleaseResponse();
 | 
			
		||||
        if (releaseResponse.getSuccess()) {
 | 
			
		||||
            scriptsLock.lock();
 | 
			
		||||
            try {
 | 
			
		||||
                if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).noneMatch(h -> h.equals(scriptHash))) {
 | 
			
		||||
                    scriptHashToBodysMap.remove(scriptHash);
 | 
			
		||||
                }
 | 
			
		||||
            } finally {
 | 
			
		||||
                scriptsLock.unlock();
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            log.debug("[{}] Failed to release script due", compiledScriptId);
 | 
			
		||||
            log.debug("[{}] Failed to release script", scriptHash);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 | 
			
		||||
        "js.max_result_size=50",
 | 
			
		||||
        "js.local.max_errors=2"
 | 
			
		||||
})
 | 
			
		||||
class JsInvokeServiceTest extends AbstractControllerTest {
 | 
			
		||||
class LocalJsInvokeServiceTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private NashornJsInvokeService jsInvokeService;
 | 
			
		||||
@ -0,0 +1,219 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.script;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.junit.jupiter.api.AfterEach;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageState;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
 | 
			
		||||
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
 | 
			
		||||
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.argThat;
 | 
			
		||||
import static org.mockito.Mockito.doAnswer;
 | 
			
		||||
import static org.mockito.Mockito.doReturn;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static org.mockito.Mockito.reset;
 | 
			
		||||
import static org.mockito.Mockito.times;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.mockito.Mockito.verifyNoInteractions;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
class RemoteJsInvokeServiceTest {
 | 
			
		||||
 | 
			
		||||
    private RemoteJsInvokeService remoteJsInvokeService;
 | 
			
		||||
    private TbQueueRequestTemplate<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> jsRequestTemplate;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    public void beforeEach() {
 | 
			
		||||
        TbApiUsageStateService apiUsageStateService = mock(TbApiUsageStateService.class);
 | 
			
		||||
        ApiUsageState apiUsageState = mock(ApiUsageState.class);
 | 
			
		||||
        when(apiUsageState.isJsExecEnabled()).thenReturn(true);
 | 
			
		||||
        when(apiUsageStateService.getApiUsageState(any())).thenReturn(apiUsageState);
 | 
			
		||||
        TbApiUsageClient apiUsageClient = mock(TbApiUsageClient.class);
 | 
			
		||||
 | 
			
		||||
        remoteJsInvokeService = new RemoteJsInvokeService(apiUsageStateService, apiUsageClient);
 | 
			
		||||
        jsRequestTemplate = mock(TbQueueRequestTemplate.class);
 | 
			
		||||
        remoteJsInvokeService.requestTemplate = jsRequestTemplate;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @AfterEach
 | 
			
		||||
    public void afterEach() {
 | 
			
		||||
        reset(jsRequestTemplate);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception {
 | 
			
		||||
        mockJsEvalResponse();
 | 
			
		||||
        String scriptBody = "return { a: 'b'};";
 | 
			
		||||
        UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
 | 
			
		||||
        reset(jsRequestTemplate);
 | 
			
		||||
 | 
			
		||||
        String expectedInvocationResult = "scriptInvocationResult";
 | 
			
		||||
        doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
 | 
			
		||||
                .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
 | 
			
		||||
                        .setSuccess(true)
 | 
			
		||||
                        .setResult(expectedInvocationResult)
 | 
			
		||||
                        .build())
 | 
			
		||||
                .build())))
 | 
			
		||||
                .when(jsRequestTemplate).send(any());
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
 | 
			
		||||
        Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get();
 | 
			
		||||
        verify(jsRequestTemplate).send(jsRequestCaptor.capture());
 | 
			
		||||
 | 
			
		||||
        JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest();
 | 
			
		||||
        assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty();
 | 
			
		||||
        assertThat(jsInvokeRequestMade.getScriptHash()).isEqualTo(getScriptHash(scriptId));
 | 
			
		||||
        assertThat(invocationResult).isEqualTo(expectedInvocationResult);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception {
 | 
			
		||||
        mockJsEvalResponse();
 | 
			
		||||
        String scriptBody = "return { a: 'b'};";
 | 
			
		||||
        UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
 | 
			
		||||
        reset(jsRequestTemplate);
 | 
			
		||||
 | 
			
		||||
        doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
 | 
			
		||||
                .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
 | 
			
		||||
                        .setSuccess(false)
 | 
			
		||||
                        .setErrorCode(JsInvokeProtos.JsInvokeErrorCode.NOT_FOUND_ERROR)
 | 
			
		||||
                        .build())
 | 
			
		||||
                .build())))
 | 
			
		||||
                .when(jsRequestTemplate).send(argThat(jsQueueMsg -> {
 | 
			
		||||
                    return StringUtils.isEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody());
 | 
			
		||||
                }));
 | 
			
		||||
 | 
			
		||||
        String expectedInvocationResult = "invocationResult";
 | 
			
		||||
        doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
 | 
			
		||||
                .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
 | 
			
		||||
                        .setSuccess(true)
 | 
			
		||||
                        .setResult(expectedInvocationResult)
 | 
			
		||||
                        .build())
 | 
			
		||||
                .build())))
 | 
			
		||||
                .when(jsRequestTemplate).send(argThat(jsQueueMsg -> {
 | 
			
		||||
                    return StringUtils.isNotEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody());
 | 
			
		||||
                }));
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
 | 
			
		||||
        Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get();
 | 
			
		||||
        verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture());
 | 
			
		||||
 | 
			
		||||
        List<TbProtoJsQueueMsg<RemoteJsRequest>> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues();
 | 
			
		||||
 | 
			
		||||
        JsInvokeProtos.JsInvokeRequest firstRequestMade = jsInvokeRequestsMade.get(0).getValue().getInvokeRequest();
 | 
			
		||||
        assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty();
 | 
			
		||||
 | 
			
		||||
        JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest();
 | 
			
		||||
        assertThat(secondRequestMade.getScriptBody()).contains(scriptBody);
 | 
			
		||||
 | 
			
		||||
        assertThat(jsInvokeRequestsMade.stream().map(TbProtoQueueMsg::getKey).distinct().count()).as("partition keys are same")
 | 
			
		||||
                .isOne();
 | 
			
		||||
 | 
			
		||||
        assertThat(invocationResult).isEqualTo(expectedInvocationResult);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void whenDoingEval_thenSaveScriptByHashOfTenantIdAndScriptBody() throws Exception {
 | 
			
		||||
        mockJsEvalResponse();
 | 
			
		||||
 | 
			
		||||
        TenantId tenantId1 = TenantId.fromUUID(UUID.randomUUID());
 | 
			
		||||
        String scriptBody1 = "var msg = { temp: 42, humidity: 77 };\n" +
 | 
			
		||||
                "var metadata = { data: 40 };\n" +
 | 
			
		||||
                "var msgType = \"POST_TELEMETRY_REQUEST\";\n" +
 | 
			
		||||
                "\n" +
 | 
			
		||||
                "return { msg: msg, metadata: metadata, msgType: msgType };";
 | 
			
		||||
 | 
			
		||||
        Set<String> scriptHashes = new HashSet<>();
 | 
			
		||||
        String tenant1Script1Hash = null;
 | 
			
		||||
        for (int i = 0; i < 3; i++) {
 | 
			
		||||
            UUID scriptUuid = remoteJsInvokeService.eval(tenantId1, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get();
 | 
			
		||||
            tenant1Script1Hash = getScriptHash(scriptUuid);
 | 
			
		||||
            scriptHashes.add(tenant1Script1Hash);
 | 
			
		||||
        }
 | 
			
		||||
        assertThat(scriptHashes).as("Unique scripts ids").size().isOne();
 | 
			
		||||
 | 
			
		||||
        TenantId tenantId2 = TenantId.fromUUID(UUID.randomUUID());
 | 
			
		||||
        UUID scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get();
 | 
			
		||||
        String tenant2Script1Id = getScriptHash(scriptUuid);
 | 
			
		||||
        assertThat(tenant2Script1Id).isNotEqualTo(tenant1Script1Hash);
 | 
			
		||||
 | 
			
		||||
        String scriptBody2 = scriptBody1 + ";;";
 | 
			
		||||
        scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody2).get();
 | 
			
		||||
        String tenant2Script2Id = getScriptHash(scriptUuid);
 | 
			
		||||
        assertThat(tenant2Script2Id).isNotEqualTo(tenant2Script1Id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void whenReleasingScript_thenCheckForHashUsages() throws Exception {
 | 
			
		||||
        mockJsEvalResponse();
 | 
			
		||||
        String scriptBody = "return { a: 'b'};";
 | 
			
		||||
        UUID scriptId1 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
 | 
			
		||||
        UUID scriptId2 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
 | 
			
		||||
        String scriptHash = getScriptHash(scriptId1);
 | 
			
		||||
        assertThat(scriptHash).isEqualTo(getScriptHash(scriptId2));
 | 
			
		||||
        reset(jsRequestTemplate);
 | 
			
		||||
 | 
			
		||||
        doReturn(Futures.immediateFuture(new TbProtoQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
 | 
			
		||||
                .setReleaseResponse(JsInvokeProtos.JsReleaseResponse.newBuilder()
 | 
			
		||||
                        .setSuccess(true)
 | 
			
		||||
                        .build())
 | 
			
		||||
                .build())))
 | 
			
		||||
                .when(jsRequestTemplate).send(any());
 | 
			
		||||
 | 
			
		||||
        remoteJsInvokeService.release(scriptId1).get();
 | 
			
		||||
        verifyNoInteractions(jsRequestTemplate);
 | 
			
		||||
        assertThat(remoteJsInvokeService.scriptHashToBodysMap).containsKey(scriptHash);
 | 
			
		||||
 | 
			
		||||
        remoteJsInvokeService.release(scriptId2).get();
 | 
			
		||||
        verify(jsRequestTemplate).send(any());
 | 
			
		||||
        assertThat(remoteJsInvokeService.scriptHashToBodysMap).isEmpty();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getScriptHash(UUID scriptUuid) {
 | 
			
		||||
        return remoteJsInvokeService.scriptIdToNameAndHashMap.get(scriptUuid).getSecond();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void mockJsEvalResponse() {
 | 
			
		||||
        doAnswer(methodCall -> Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
 | 
			
		||||
                .setCompileResponse(JsInvokeProtos.JsCompileResponse.newBuilder()
 | 
			
		||||
                        .setSuccess(true)
 | 
			
		||||
                        .setScriptHash(methodCall.<TbProtoQueueMsg<RemoteJsRequest>>getArgument(0).getValue().getCompileRequest().getScriptHash())
 | 
			
		||||
                        .build())
 | 
			
		||||
                .build())))
 | 
			
		||||
                .when(jsRequestTemplate).send(argThat(jsQueueMsg -> jsQueueMsg.getValue().hasCompileRequest()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -23,6 +23,7 @@ enum JsInvokeErrorCode {
 | 
			
		||||
  COMPILATION_ERROR = 0;
 | 
			
		||||
  RUNTIME_ERROR = 1;
 | 
			
		||||
  TIMEOUT_ERROR = 2;
 | 
			
		||||
  NOT_FOUND_ERROR = 3;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message RemoteJsRequest {
 | 
			
		||||
@ -40,39 +41,34 @@ message RemoteJsResponse {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message JsCompileRequest {
 | 
			
		||||
  int64 scriptIdMSB = 1;
 | 
			
		||||
  int64 scriptIdLSB = 2;
 | 
			
		||||
  string functionName = 3;
 | 
			
		||||
  string scriptBody = 4;
 | 
			
		||||
  string scriptHash = 5;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message JsReleaseRequest {
 | 
			
		||||
  int64 scriptIdMSB = 1;
 | 
			
		||||
  int64 scriptIdLSB = 2;
 | 
			
		||||
  string functionName = 3;
 | 
			
		||||
  string scriptHash = 4;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message JsReleaseResponse {
 | 
			
		||||
  bool success = 1;
 | 
			
		||||
  int64 scriptIdMSB = 2;
 | 
			
		||||
  int64 scriptIdLSB = 3;
 | 
			
		||||
  string scriptHash = 4;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message JsCompileResponse {
 | 
			
		||||
  bool success = 1;
 | 
			
		||||
  int64 scriptIdMSB = 2;
 | 
			
		||||
  int64 scriptIdLSB = 3;
 | 
			
		||||
  JsInvokeErrorCode errorCode = 4;
 | 
			
		||||
  string errorDetails = 5;
 | 
			
		||||
  string scriptHash = 6;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message JsInvokeRequest {
 | 
			
		||||
  int64 scriptIdMSB = 1;
 | 
			
		||||
  int64 scriptIdLSB = 2;
 | 
			
		||||
  string functionName = 3;
 | 
			
		||||
  string scriptBody = 4;
 | 
			
		||||
  int32 timeout = 5;
 | 
			
		||||
  repeated string args = 6;
 | 
			
		||||
  string scriptHash = 7;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message JsInvokeResponse {
 | 
			
		||||
@ -81,4 +77,3 @@ message JsInvokeResponse {
 | 
			
		||||
  JsInvokeErrorCode errorCode = 3;
 | 
			
		||||
  string errorDetails = 4;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -16,8 +16,9 @@
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
export interface TbMessage {
 | 
			
		||||
    scriptIdMSB: string;
 | 
			
		||||
    scriptIdLSB: string;
 | 
			
		||||
    scriptIdMSB: string; // deprecated
 | 
			
		||||
    scriptIdLSB: string; // deprecated
 | 
			
		||||
    scriptHash: string;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface RemoteJsRequest {
 | 
			
		||||
 | 
			
		||||
@ -18,7 +18,7 @@ import config from 'config';
 | 
			
		||||
import { _logger } from '../config/logger';
 | 
			
		||||
import { JsExecutor, TbScript } from './jsExecutor';
 | 
			
		||||
import { performance } from 'perf_hooks';
 | 
			
		||||
import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits } from './utils';
 | 
			
		||||
import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits, isNotUUID } from './utils';
 | 
			
		||||
import { IQueue } from '../queue/queue.models';
 | 
			
		||||
import {
 | 
			
		||||
    JsCompileRequest,
 | 
			
		||||
@ -36,6 +36,7 @@ import Long from 'long';
 | 
			
		||||
const COMPILATION_ERROR = 0;
 | 
			
		||||
const RUNTIME_ERROR = 1;
 | 
			
		||||
const TIMEOUT_ERROR = 2;
 | 
			
		||||
const NOT_FOUND_ERROR = 3;
 | 
			
		||||
 | 
			
		||||
const statFrequency = Number(config.get('script.stat_print_frequency'));
 | 
			
		||||
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency'));
 | 
			
		||||
@ -129,7 +130,12 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
    processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) {
 | 
			
		||||
        const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest);
 | 
			
		||||
        this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
 | 
			
		||||
 | 
			
		||||
        if (this.scriptMap.has(scriptId)) {
 | 
			
		||||
            const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true);
 | 
			
		||||
            this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId);
 | 
			
		||||
            this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        this.executor.compileScript(compileRequest.scriptBody).then(
 | 
			
		||||
            (script) => {
 | 
			
		||||
                this.cacheScript(scriptId, script);
 | 
			
		||||
@ -170,7 +176,7 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
                            this.logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
 | 
			
		||||
                            this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
 | 
			
		||||
                        } else {
 | 
			
		||||
                            let err = {
 | 
			
		||||
                            const err = {
 | 
			
		||||
                                name: 'Error',
 | 
			
		||||
                                message: 'script invocation result exceeds maximum allowed size of ' + maxResultSize + ' symbols'
 | 
			
		||||
                            }
 | 
			
		||||
@ -193,8 +199,12 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
                )
 | 
			
		||||
            },
 | 
			
		||||
            (err: any) => {
 | 
			
		||||
                const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, COMPILATION_ERROR, err);
 | 
			
		||||
                this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR);
 | 
			
		||||
                let errorCode = COMPILATION_ERROR;
 | 
			
		||||
                if (err?.name === 'script body not found') {
 | 
			
		||||
                    errorCode = NOT_FOUND_ERROR;
 | 
			
		||||
                }
 | 
			
		||||
                const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err);
 | 
			
		||||
                this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
 | 
			
		||||
                this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
@ -222,7 +232,7 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
        const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
 | 
			
		||||
        const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
 | 
			
		||||
        this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId);
 | 
			
		||||
        this.producer.send(responseTopic, scriptId, rawResponse, headers).then(
 | 
			
		||||
        this.producer.send(responseTopic, requestId, rawResponse, headers).then(
 | 
			
		||||
            () => {
 | 
			
		||||
                this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);
 | 
			
		||||
            },
 | 
			
		||||
@ -242,7 +252,7 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
            if (script) {
 | 
			
		||||
                self.incrementUseScriptId(scriptId);
 | 
			
		||||
                resolve(script);
 | 
			
		||||
            } else {
 | 
			
		||||
            } else if (scriptBody) {
 | 
			
		||||
                const startTime = performance.now();
 | 
			
		||||
                self.executor.compileScript(scriptBody).then(
 | 
			
		||||
                    (compiledScript) => {
 | 
			
		||||
@ -255,6 +265,12 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
                        reject(err);
 | 
			
		||||
                    }
 | 
			
		||||
                );
 | 
			
		||||
            } else {
 | 
			
		||||
                const err = {
 | 
			
		||||
                    name: 'script body not found',
 | 
			
		||||
                    message: ''
 | 
			
		||||
                }
 | 
			
		||||
                reject(err);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
@ -285,14 +301,26 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse {
 | 
			
		||||
        const scriptIdBits = UUIDToBits(scriptId);
 | 
			
		||||
        return {
 | 
			
		||||
            errorCode: errorCode,
 | 
			
		||||
            success: success,
 | 
			
		||||
            errorDetails: parseJsErrorDetails(err),
 | 
			
		||||
            scriptIdMSB: scriptIdBits[0],
 | 
			
		||||
            scriptIdLSB: scriptIdBits[1]
 | 
			
		||||
        };
 | 
			
		||||
        if (isNotUUID(scriptId)) {
 | 
			
		||||
            return {
 | 
			
		||||
                errorCode: errorCode,
 | 
			
		||||
                success: success,
 | 
			
		||||
                errorDetails: parseJsErrorDetails(err),
 | 
			
		||||
                scriptIdMSB: "0",
 | 
			
		||||
                scriptIdLSB: "0",
 | 
			
		||||
                scriptHash: scriptId
 | 
			
		||||
            };
 | 
			
		||||
        } else { // this is for backward compatibility (to be able to work with tb-node of previous version) - todo: remove in the next release
 | 
			
		||||
            let scriptIdBits = UUIDToBits(scriptId);
 | 
			
		||||
            return {
 | 
			
		||||
                errorCode: errorCode,
 | 
			
		||||
                success: success,
 | 
			
		||||
                errorDetails: parseJsErrorDetails(err),
 | 
			
		||||
                scriptIdMSB: scriptIdBits[0],
 | 
			
		||||
                scriptIdLSB: scriptIdBits[1],
 | 
			
		||||
                scriptHash: ""
 | 
			
		||||
            };
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static createInvokeResponse(result: string, success: boolean, errorCode?: number, err?: any): JsInvokeResponse {
 | 
			
		||||
@ -305,16 +333,26 @@ export class JsInvokeMessageProcessor {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse {
 | 
			
		||||
        const scriptIdBits = UUIDToBits(scriptId);
 | 
			
		||||
        return {
 | 
			
		||||
            success: success,
 | 
			
		||||
            scriptIdMSB: scriptIdBits[0],
 | 
			
		||||
            scriptIdLSB: scriptIdBits[1]
 | 
			
		||||
        };
 | 
			
		||||
        if (isNotUUID(scriptId)) {
 | 
			
		||||
            return {
 | 
			
		||||
                success: success,
 | 
			
		||||
                scriptIdMSB: "0",
 | 
			
		||||
                scriptIdLSB: "0",
 | 
			
		||||
                scriptHash: scriptId,
 | 
			
		||||
            };
 | 
			
		||||
        } else { // todo: remove in the next release
 | 
			
		||||
            let scriptIdBits = UUIDToBits(scriptId);
 | 
			
		||||
            return {
 | 
			
		||||
                success: success,
 | 
			
		||||
                scriptIdMSB: scriptIdBits[0],
 | 
			
		||||
                scriptIdLSB: scriptIdBits[1],
 | 
			
		||||
                scriptHash: ""
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static getScriptId(request: TbMessage): string {
 | 
			
		||||
        return toUUIDString(request.scriptIdMSB, request.scriptIdLSB);
 | 
			
		||||
        return request.scriptHash ? request.scriptHash : toUUIDString(request.scriptIdMSB, request.scriptIdLSB);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private incrementUseScriptId(scriptId: string) {
 | 
			
		||||
 | 
			
		||||
@ -58,3 +58,7 @@ export function parseJsErrorDetails(err: any): string | undefined {
 | 
			
		||||
    }
 | 
			
		||||
    return details;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function isNotUUID(candidate: string) {
 | 
			
		||||
    return candidate.length != 36 || !candidate.includes('-');
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -123,10 +123,10 @@ export class AwsSqsTemplate implements IQueue {
 | 
			
		||||
        this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
    async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
        let msgBody = JSON.stringify(
 | 
			
		||||
            {
 | 
			
		||||
                key: scriptId,
 | 
			
		||||
                key: msgKey,
 | 
			
		||||
                data: [...rawResponse],
 | 
			
		||||
                headers: headers
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
@ -149,12 +149,11 @@ export class KafkaTemplate implements IQueue {
 | 
			
		||||
        });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
    async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
        this.logger.debug('Pending queue response, scriptId: [%s]', scriptId);
 | 
			
		||||
    async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
        const message = {
 | 
			
		||||
            topic: responseTopic,
 | 
			
		||||
            messages: [{
 | 
			
		||||
                key: scriptId,
 | 
			
		||||
                key: msgKey,
 | 
			
		||||
                value: rawResponse,
 | 
			
		||||
                headers: headers.data
 | 
			
		||||
            }]
 | 
			
		||||
 | 
			
		||||
@ -80,7 +80,7 @@ export class PubSubTemplate implements IQueue {
 | 
			
		||||
        subscription.on('message', messageHandler);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
    async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
        if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) {
 | 
			
		||||
            await this.createTopic(this.requestTopic);
 | 
			
		||||
            await this.createSubscription(this.requestTopic);
 | 
			
		||||
@ -88,7 +88,7 @@ export class PubSubTemplate implements IQueue {
 | 
			
		||||
 | 
			
		||||
        let data = JSON.stringify(
 | 
			
		||||
            {
 | 
			
		||||
                key: scriptId,
 | 
			
		||||
                key: msgKey,
 | 
			
		||||
                data: [...rawResponse],
 | 
			
		||||
                headers: headers
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,6 @@
 | 
			
		||||
export interface IQueue {
 | 
			
		||||
    name: string;
 | 
			
		||||
    init(): Promise<void>;
 | 
			
		||||
    send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any>;
 | 
			
		||||
    send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any>;
 | 
			
		||||
    destroy(): Promise<void>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -65,7 +65,7 @@ export class RabbitMqTemplate implements IQueue {
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
    async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
 | 
			
		||||
        if (!this.topics.includes(responseTopic)) {
 | 
			
		||||
            await this.createQueue(responseTopic);
 | 
			
		||||
@ -74,7 +74,7 @@ export class RabbitMqTemplate implements IQueue {
 | 
			
		||||
 | 
			
		||||
        let data = JSON.stringify(
 | 
			
		||||
            {
 | 
			
		||||
                key: scriptId,
 | 
			
		||||
                key: msgKey,
 | 
			
		||||
                data: [...rawResponse],
 | 
			
		||||
                headers: headers
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
@ -82,7 +82,7 @@ export class ServiceBusTemplate implements IQueue {
 | 
			
		||||
        this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
    async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
 | 
			
		||||
        if (!this.queues.includes(this.requestTopic)) {
 | 
			
		||||
            await this.createQueueIfNotExist(this.requestTopic);
 | 
			
		||||
            this.queues.push(this.requestTopic);
 | 
			
		||||
@ -96,7 +96,7 @@ export class ServiceBusTemplate implements IQueue {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let data = {
 | 
			
		||||
            key: scriptId,
 | 
			
		||||
            key: msgKey,
 | 
			
		||||
            data: [...rawResponse],
 | 
			
		||||
            headers: headers
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user