Use hash of tenant id and body when resolving script for remote JS executor

This commit is contained in:
ViacheslavKlimov 2022-10-11 12:01:55 +03:00
parent 100868d8e2
commit 831332be7b
14 changed files with 237 additions and 114 deletions

View File

@ -15,12 +15,14 @@
*/ */
package org.thingsboard.server.service.script; package org.thingsboard.server.service.script;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.util.Pair;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
@ -41,13 +43,15 @@ import static java.lang.String.format;
* Created by ashvayka on 26.09.18. * Created by ashvayka on 26.09.18.
*/ */
@Slf4j @Slf4j
@SuppressWarnings("UnstableApiUsage")
public abstract class AbstractJsInvokeService implements JsInvokeService { public abstract class AbstractJsInvokeService implements JsInvokeService {
private final TbApiUsageStateService apiUsageStateService; private final TbApiUsageStateService apiUsageStateService;
private final TbApiUsageClient apiUsageClient; private final TbApiUsageClient apiUsageClient;
protected ScheduledExecutorService timeoutExecutorService; protected ScheduledExecutorService timeoutExecutorService;
protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
protected Map<UUID, DisableListInfo> disabledFunctions = new ConcurrentHashMap<>(); protected final Map<UUID, Pair<String, String>> scriptIdToNameAndHashMap = new ConcurrentHashMap<>();
protected final Map<UUID, DisableListInfo> disabledFunctions = new ConcurrentHashMap<>();
@Getter @Getter
@Value("${js.max_total_args_size:100000}") @Value("${js.max_total_args_size:100000}")
@ -83,27 +87,34 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
return error(format("Script body exceeds maximum allowed size of %s symbols", getMaxScriptBodySize())); return error(format("Script body exceeds maximum allowed size of %s symbols", getMaxScriptBodySize()));
} }
UUID scriptId = UUID.randomUUID(); UUID scriptId = UUID.randomUUID();
String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); String scriptHash = hash(tenantId, scriptBody);
String functionName = constructFunctionName(scriptId, scriptHash);
String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
return doEval(scriptId, functionName, jsScript); return doEval(scriptId, scriptHash, functionName, jsScript);
} else { } else {
return error("JS Execution is disabled due to API limits!"); return error("JS Execution is disabled due to API limits!");
} }
} }
protected String constructFunctionName(UUID scriptId, String scriptHash) {
return "invokeInternal_" + scriptId.toString().replace('-', '_');
}
@Override @Override
public ListenableFuture<String> invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) { public ListenableFuture<String> invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) {
if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) { if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) {
String functionName = scriptIdToNameMap.get(scriptId); Pair<String, String> nameAndHash = scriptIdToNameAndHashMap.get(scriptId);
if (functionName == null) { if (nameAndHash == null) {
return error("No compiled script found for scriptId: [" + scriptId + "]!"); return error("No compiled script found for scriptId: [" + scriptId + "]!");
} }
String functionName = nameAndHash.getFirst();
String scriptHash = nameAndHash.getSecond();
if (!isDisabled(scriptId)) { if (!isDisabled(scriptId)) {
if (argsSizeExceeded(args)) { if (argsSizeExceeded(args)) {
return scriptExecutionError(scriptId, format("Script input arguments exceed maximum allowed total args size of %s symbols", getMaxTotalArgsSize())); return scriptExecutionError(scriptId, format("Script input arguments exceed maximum allowed total args size of %s symbols", getMaxTotalArgsSize()));
} }
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1); apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1);
return Futures.transformAsync(doInvokeFunction(scriptId, functionName, args), output -> { return Futures.transformAsync(doInvokeFunction(scriptId, scriptHash, functionName, args), output -> {
String result = output.toString(); String result = output.toString();
if (resultSizeExceeded(result)) { if (resultSizeExceeded(result)) {
return scriptExecutionError(scriptId, format("Script invocation result exceeds maximum allowed size of %s symbols", getMaxResultSize())); return scriptExecutionError(scriptId, format("Script invocation result exceeds maximum allowed size of %s symbols", getMaxResultSize()));
@ -123,12 +134,12 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
@Override @Override
public ListenableFuture<Void> release(UUID scriptId) { public ListenableFuture<Void> release(UUID scriptId) {
String functionName = scriptIdToNameMap.get(scriptId); Pair<String, String> nameAndHash = scriptIdToNameAndHashMap.get(scriptId);
if (functionName != null) { if (nameAndHash != null) {
try { try {
scriptIdToNameMap.remove(scriptId); scriptIdToNameAndHashMap.remove(scriptId);
disabledFunctions.remove(scriptId); disabledFunctions.remove(scriptId);
doRelease(scriptId, functionName); doRelease(scriptId, nameAndHash.getSecond(), nameAndHash.getFirst());
} catch (Exception e) { } catch (Exception e) {
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} }
@ -136,16 +147,24 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody); protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody);
protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args); protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args);
protected abstract void doRelease(UUID scriptId, String functionName) throws Exception; protected abstract void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception;
protected abstract int getMaxErrors(); protected abstract int getMaxErrors();
protected abstract long getMaxBlacklistDuration(); protected abstract long getMaxBlacklistDuration();
protected String hash(TenantId tenantId, String scriptBody) {
return Hashing.murmur3_128().newHasher()
.putLong(tenantId.getId().getMostSignificantBits())
.putLong(tenantId.getId().getLeastSignificantBits())
.putUnencodedChars(scriptBody)
.hash().toString();
}
protected void onScriptExecutionError(UUID scriptId, Throwable t, String scriptBody) { protected void onScriptExecutionError(UUID scriptId, Throwable t, String scriptBody) {
DisableListInfo disableListInfo = disabledFunctions.computeIfAbsent(scriptId, key -> new DisableListInfo()); DisableListInfo disableListInfo = disabledFunctions.computeIfAbsent(scriptId, key -> new DisableListInfo());
log.warn("Script has exception and will increment counter {} on disabledFunctions for id {}, exception {}, cause {}, scriptBody {}", log.warn("Script has exception and will increment counter {} on disabledFunctions for id {}, exception {}, cause {}, scriptBody {}",

View File

@ -24,6 +24,7 @@ import delight.nashornsandbox.NashornSandboxes;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.util.Pair;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
@ -38,7 +39,6 @@ import javax.script.ScriptException;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -121,7 +121,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
protected abstract long getMaxCpuTime(); protected abstract long getMaxCpuTime();
@Override @Override
protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String jsScript) { protected ListenableFuture<UUID> doEval(UUID scriptId, String scriptHash, String functionName, String jsScript) {
jsPushedMsgs.incrementAndGet(); jsPushedMsgs.incrementAndGet();
ListenableFuture<UUID> result = jsExecutor.executeAsync(() -> { ListenableFuture<UUID> result = jsExecutor.executeAsync(() -> {
try { try {
@ -135,7 +135,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
} finally { } finally {
evalLock.unlock(); evalLock.unlock();
} }
scriptIdToNameMap.put(scriptId, functionName); scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash));
return scriptId; return scriptId;
} catch (Exception e) { } catch (Exception e) {
log.debug("Failed to compile JS script: {}", e.getMessage(), e); log.debug("Failed to compile JS script: {}", e.getMessage(), e);
@ -150,7 +150,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
} }
@Override @Override
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) { protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) {
jsPushedMsgs.incrementAndGet(); jsPushedMsgs.incrementAndGet();
ListenableFuture<Object> result = jsExecutor.executeAsync(() -> { ListenableFuture<Object> result = jsExecutor.executeAsync(() -> {
try { try {
@ -174,7 +174,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
return result; return result;
} }
protected void doRelease(UUID scriptId, String functionName) throws ScriptException { protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws ScriptException {
if (useJsSandbox()) { if (useJsSandbox()) {
sandbox.eval(functionName + " = undefined;"); sandbox.eval(functionName + " = undefined;");
} else { } else {

View File

@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.data.util.Pair;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch; import org.springframework.util.StopWatch;
@ -42,12 +43,13 @@ import javax.annotation.PreDestroy;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Slf4j
@ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')") @ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')")
@ -103,7 +105,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
@Autowired @Autowired
protected TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate; protected TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>(); protected final Map<String, String> scriptHashToBodysMap = new ConcurrentHashMap<>();
private final Lock scriptsLock = new ReentrantLock();
@PostConstruct @PostConstruct
public void init() { public void init() {
@ -120,10 +123,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
} }
@Override @Override
protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody) { protected ListenableFuture<UUID> doEval(UUID scriptId, String scriptHash, String functionName, String scriptBody) {
JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder() JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder()
.setScriptIdMSB(scriptId.getMostSignificantBits()) .setScriptHash(scriptHash)
.setScriptIdLSB(scriptId.getLeastSignificantBits())
.setFunctionName(functionName) .setFunctionName(functionName)
.setScriptBody(scriptBody).build(); .setScriptBody(scriptBody).build();
@ -131,35 +133,46 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setCompileRequest(jsRequest) .setCompileRequest(jsRequest)
.build(); .build();
log.trace("Post compile request for scriptId [{}]", scriptId); log.trace("Post compile request for scriptId [{}] (hash: {})", scriptId, scriptHash);
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendRequest(jsRequestWrapper, maxEvalRequestsTimeout, queueEvalMsgs); ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendJsRequest(UUID.randomUUID(), jsRequestWrapper, maxEvalRequestsTimeout, queueEvalMsgs);
return Futures.transform(future, response -> { return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse(); JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse();
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
if (compilationResult.getSuccess()) { if (compilationResult.getSuccess()) {
scriptIdToNameMap.put(scriptId, functionName); scriptsLock.lock();
scriptIdToBodysMap.put(scriptId, scriptBody); try {
return compiledScriptId; scriptIdToNameAndHashMap.put(scriptId, Pair.of(functionName, scriptHash));
scriptHashToBodysMap.put(scriptHash, scriptBody);
} finally {
scriptsLock.unlock();
}
return scriptId;
} else { } else {
log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); log.debug("[{}] (hash: {}) Failed to compile script due to [{}]: {}", scriptId, compilationResult.getScriptHash(),
compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
throw new RuntimeException(compilationResult.getErrorDetails()); throw new RuntimeException(compilationResult.getErrorDetails());
} }
}, callbackExecutor); }, callbackExecutor);
} }
@Override @Override
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) { protected String constructFunctionName(UUID scriptId, String scriptHash) {
log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptId, maxRequestsTimeout); return "invokeInternal_" + scriptHash;
final String scriptBody = scriptIdToBodysMap.get(scriptId); }
@Override
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String scriptHash, String functionName, Object[] args) {
log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptHash, maxRequestsTimeout);
String scriptBody = scriptHashToBodysMap.get(scriptHash);
if (scriptBody == null) { if (scriptBody == null) {
return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!")); return Futures.immediateFailedFuture(new RuntimeException("No script body found for script hash [" + scriptHash + "] (script id: [" + scriptId + "])"));
} }
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(scriptId, functionName, args, false, null); JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(scriptHash, functionName, args, false, null);
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendRequest(jsRequestWrapper, maxRequestsTimeout, queueInvokeMsgs); UUID requestKey = UUID.randomUUID();
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendJsRequest(requestKey, jsRequestWrapper, maxRequestsTimeout, queueInvokeMsgs);
return Futures.transformAsync(future, response -> { return Futures.transformAsync(future, response -> {
stopWatch.stop(); stopWatch.stop();
log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey()); log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
@ -167,12 +180,14 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
if (invokeResult.getSuccess()) { if (invokeResult.getSuccess()) {
return Futures.immediateFuture(invokeResult.getResult()); return Futures.immediateFuture(invokeResult.getResult());
} else { } else {
return handleInvokeError(scriptId, invokeResult.getErrorCode(), invokeResult.getErrorDetails(), functionName, args, scriptBody); return handleInvokeError(requestKey, scriptId, scriptHash, invokeResult.getErrorCode(),
invokeResult.getErrorDetails(), functionName, args, scriptBody);
} }
}, callbackExecutor); }, callbackExecutor);
} }
private ListenableFuture<Object> handleInvokeError(UUID scriptId, JsInvokeErrorCode errorCode, String errorDetails, private ListenableFuture<Object> handleInvokeError(UUID requestKey, UUID scriptId, String scriptHash,
JsInvokeErrorCode errorCode, String errorDetails,
String functionName, Object[] args, String scriptBody) { String functionName, Object[] args, String scriptBody) {
log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails); log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails);
RuntimeException e = new RuntimeException(errorDetails); RuntimeException e = new RuntimeException(errorDetails);
@ -184,14 +199,15 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
} else if (JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) { } else if (JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) {
log.debug("[{}] Remote JS executor couldn't find the script", scriptId); log.debug("[{}] Remote JS executor couldn't find the script", scriptId);
if (scriptBody != null) { if (scriptBody != null) {
JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(scriptId, functionName, args, true, scriptBody); JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(scriptHash, functionName, args, true, scriptBody);
log.debug("[{}] Sending invoke request again with script body", scriptId); log.debug("[{}] Sending invoke request again with script body", scriptId);
return Futures.transformAsync(sendJsRequest(invokeRequestWithScriptBody, maxRequestsTimeout, queueInvokeMsgs, MoreExecutors.directExecutor()), r -> { return Futures.transformAsync(sendJsRequest(requestKey, invokeRequestWithScriptBody, maxRequestsTimeout, queueInvokeMsgs), r -> {
JsInvokeProtos.JsInvokeResponse result = r.getValue().getInvokeResponse(); JsInvokeProtos.JsInvokeResponse result = r.getValue().getInvokeResponse();
if (result.getSuccess()) { if (result.getSuccess()) {
return Futures.immediateFuture(result.getResult()); return Futures.immediateFuture(result.getResult());
} else { } else {
return handleInvokeError(scriptId, result.getErrorCode(), result.getErrorDetails(), functionName, args, null); return handleInvokeError(requestKey, scriptId, scriptHash, result.getErrorCode(),
result.getErrorDetails(), functionName, args, null);
} }
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
@ -200,12 +216,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} }
private ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> sendRequest(JsInvokeProtos.RemoteJsRequest jsRequestWrapper, long maxRequestsTimeout, AtomicInteger msgsCounter) { private ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> sendJsRequest(UUID requestKey, JsInvokeProtos.RemoteJsRequest jsRequestWrapper,
return sendJsRequest(jsRequestWrapper, maxRequestsTimeout, msgsCounter, callbackExecutor); long maxRequestsTimeout, AtomicInteger msgsCounter) {
} ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, jsRequestWrapper));
private ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> sendJsRequest(JsInvokeProtos.RemoteJsRequest jsRequestWrapper, long maxRequestsTimeout, AtomicInteger msgsCounter, Executor callbackExecutor) {
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
if (maxRequestsTimeout > 0) { if (maxRequestsTimeout > 0) {
future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
} }
@ -227,10 +240,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
return future; return future;
} }
private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(UUID scriptId, String functionName, Object[] args, boolean includeScriptBody, String scriptBody) { private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(String scriptHash, String functionName, Object[] args, boolean includeScriptBody, String scriptBody) {
JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder() JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
.setScriptIdMSB(scriptId.getMostSignificantBits()) .setScriptHash(scriptHash)
.setScriptIdLSB(scriptId.getLeastSignificantBits())
.setFunctionName(functionName) .setFunctionName(functionName)
.setTimeout((int) maxExecRequestsTimeout); .setTimeout((int) maxExecRequestsTimeout);
if (includeScriptBody) jsRequestBuilder.setScriptBody(scriptBody); if (includeScriptBody) jsRequestBuilder.setScriptBody(scriptBody);
@ -245,10 +257,12 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
} }
@Override @Override
protected void doRelease(UUID scriptId, String functionName) throws Exception { protected void doRelease(UUID scriptId, String scriptHash, String functionName) throws Exception {
if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).anyMatch(hash -> hash.equals(scriptHash))) {
return;
}
JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder() JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder()
.setScriptIdMSB(scriptId.getMostSignificantBits()) .setScriptHash(scriptHash)
.setScriptIdLSB(scriptId.getLeastSignificantBits())
.setFunctionName(functionName).build(); .setFunctionName(functionName).build();
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
@ -261,12 +275,18 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
} }
JsInvokeProtos.RemoteJsResponse response = future.get().getValue(); JsInvokeProtos.RemoteJsResponse response = future.get().getValue();
JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse(); JsInvokeProtos.JsReleaseResponse releaseResponse = response.getReleaseResponse();
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); if (releaseResponse.getSuccess()) {
if (compilationResult.getSuccess()) { scriptsLock.lock();
scriptIdToBodysMap.remove(scriptId); try {
if (scriptIdToNameAndHashMap.values().stream().map(Pair::getSecond).noneMatch(h -> h.equals(scriptHash))) {
scriptHashToBodysMap.remove(scriptHash);
}
} finally {
scriptsLock.unlock();
}
} else { } else {
log.debug("[{}] Failed to release script due", compiledScriptId); log.debug("[{}] Failed to release script", scriptHash);
} }
} }

View File

@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
"js.max_result_size=50", "js.max_result_size=50",
"js.local.max_errors=2" "js.local.max_errors=2"
}) })
class JsInvokeServiceTest extends AbstractControllerTest { class LocalJsInvokeServiceTest extends AbstractControllerTest {
@Autowired @Autowired
private NashornJsInvokeService jsInvokeService; private NashornJsInvokeService jsInvokeService;

View File

@ -21,24 +21,33 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
class RemoteJsInvokeServiceTest { class RemoteJsInvokeServiceTest {
@ -48,7 +57,13 @@ class RemoteJsInvokeServiceTest {
@BeforeEach @BeforeEach
public void beforeEach() { public void beforeEach() {
remoteJsInvokeService = new RemoteJsInvokeService(null, null); TbApiUsageStateService apiUsageStateService = mock(TbApiUsageStateService.class);
ApiUsageState apiUsageState = mock(ApiUsageState.class);
when(apiUsageState.isJsExecEnabled()).thenReturn(true);
when(apiUsageStateService.getApiUsageState(any())).thenReturn(apiUsageState);
TbApiUsageClient apiUsageClient = mock(TbApiUsageClient.class);
remoteJsInvokeService = new RemoteJsInvokeService(apiUsageStateService, apiUsageClient);
jsRequestTemplate = mock(TbQueueRequestTemplate.class); jsRequestTemplate = mock(TbQueueRequestTemplate.class);
remoteJsInvokeService.requestTemplate = jsRequestTemplate; remoteJsInvokeService.requestTemplate = jsRequestTemplate;
} }
@ -60,8 +75,10 @@ class RemoteJsInvokeServiceTest {
@Test @Test
public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception { public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception {
UUID scriptId = UUID.randomUUID(); mockJsEvalResponse();
remoteJsInvokeService.scriptIdToBodysMap.put(scriptId, "scriptscriptscript"); String scriptBody = "return { a: 'b'};";
UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
reset(jsRequestTemplate);
String expectedInvocationResult = "scriptInvocationResult"; String expectedInvocationResult = "scriptInvocationResult";
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
@ -73,21 +90,21 @@ class RemoteJsInvokeServiceTest {
.when(jsRequestTemplate).send(any()); .when(jsRequestTemplate).send(any());
ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
Object invocationResult = remoteJsInvokeService.doInvokeFunction(scriptId, "f", new Object[]{"a"}).get(); Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get();
verify(jsRequestTemplate).send(jsRequestCaptor.capture()); verify(jsRequestTemplate).send(jsRequestCaptor.capture());
JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest(); JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest();
assertThat(jsInvokeRequestMade.getScriptIdLSB()).isEqualTo(scriptId.getLeastSignificantBits());
assertThat(jsInvokeRequestMade.getScriptIdMSB()).isEqualTo(scriptId.getMostSignificantBits());
assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty(); assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty();
assertThat(jsInvokeRequestMade.getScriptHash()).isEqualTo(getScriptHash(scriptId));
assertThat(invocationResult).isEqualTo(expectedInvocationResult); assertThat(invocationResult).isEqualTo(expectedInvocationResult);
} }
@Test @Test
public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception { public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception {
UUID scriptId = UUID.randomUUID(); mockJsEvalResponse();
String scriptBody = "scriptscriptscript"; String scriptBody = "return { a: 'b'};";
remoteJsInvokeService.scriptIdToBodysMap.put(scriptId, scriptBody); UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
reset(jsRequestTemplate);
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder() doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
.setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder() .setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
@ -111,7 +128,7 @@ class RemoteJsInvokeServiceTest {
})); }));
ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class); ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
Object invocationResult = remoteJsInvokeService.doInvokeFunction(scriptId, "f", new Object[]{"a"}).get(); Object invocationResult = remoteJsInvokeService.invokeFunction(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get();
verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture()); verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture());
List<TbProtoJsQueueMsg<RemoteJsRequest>> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues(); List<TbProtoJsQueueMsg<RemoteJsRequest>> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues();
@ -120,9 +137,83 @@ class RemoteJsInvokeServiceTest {
assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty(); assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty();
JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest(); JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest();
assertThat(secondRequestMade.getScriptBody()).isEqualTo(scriptBody); assertThat(secondRequestMade.getScriptBody()).contains(scriptBody);
assertThat(jsInvokeRequestsMade.stream().map(TbProtoQueueMsg::getKey).distinct().count()).as("partition keys are same")
.isOne();
assertThat(invocationResult).isEqualTo(expectedInvocationResult); assertThat(invocationResult).isEqualTo(expectedInvocationResult);
} }
@Test
public void whenDoingEval_thenSaveScriptByHashOfTenantIdAndScriptBody() throws Exception {
mockJsEvalResponse();
TenantId tenantId1 = TenantId.fromUUID(UUID.randomUUID());
String scriptBody1 = "var msg = { temp: 42, humidity: 77 };\n" +
"var metadata = { data: 40 };\n" +
"var msgType = \"POST_TELEMETRY_REQUEST\";\n" +
"\n" +
"return { msg: msg, metadata: metadata, msgType: msgType };";
Set<String> scriptHashes = new HashSet<>();
String tenant1Script1Hash = null;
for (int i = 0; i < 3; i++) {
UUID scriptUuid = remoteJsInvokeService.eval(tenantId1, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get();
tenant1Script1Hash = getScriptHash(scriptUuid);
scriptHashes.add(tenant1Script1Hash);
}
assertThat(scriptHashes).as("Unique scripts ids").size().isOne();
TenantId tenantId2 = TenantId.fromUUID(UUID.randomUUID());
UUID scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody1).get();
String tenant2Script1Id = getScriptHash(scriptUuid);
assertThat(tenant2Script1Id).isNotEqualTo(tenant1Script1Hash);
String scriptBody2 = scriptBody1 + ";;";
scriptUuid = remoteJsInvokeService.eval(tenantId2, JsScriptType.RULE_NODE_SCRIPT, scriptBody2).get();
String tenant2Script2Id = getScriptHash(scriptUuid);
assertThat(tenant2Script2Id).isNotEqualTo(tenant2Script1Id);
}
@Test
public void whenReleasingScript_thenCheckForHashUsages() throws Exception {
mockJsEvalResponse();
String scriptBody = "return { a: 'b'};";
UUID scriptId1 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
UUID scriptId2 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, JsScriptType.RULE_NODE_SCRIPT, scriptBody).get();
String scriptHash = getScriptHash(scriptId1);
assertThat(scriptHash).isEqualTo(getScriptHash(scriptId2));
reset(jsRequestTemplate);
doReturn(Futures.immediateFuture(new TbProtoQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
.setReleaseResponse(JsInvokeProtos.JsReleaseResponse.newBuilder()
.setSuccess(true)
.build())
.build())))
.when(jsRequestTemplate).send(any());
remoteJsInvokeService.release(scriptId1).get();
verifyNoInteractions(jsRequestTemplate);
assertThat(remoteJsInvokeService.scriptHashToBodysMap).containsKey(scriptHash);
remoteJsInvokeService.release(scriptId2).get();
verify(jsRequestTemplate).send(any());
assertThat(remoteJsInvokeService.scriptHashToBodysMap).isEmpty();
}
private String getScriptHash(UUID scriptUuid) {
return remoteJsInvokeService.scriptIdToNameAndHashMap.get(scriptUuid).getSecond();
}
private void mockJsEvalResponse() {
doAnswer(methodCall -> Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
.setCompileResponse(JsInvokeProtos.JsCompileResponse.newBuilder()
.setSuccess(true)
.setScriptHash(methodCall.<TbProtoQueueMsg<RemoteJsRequest>>getArgument(0).getValue().getCompileRequest().getScriptHash())
.build())
.build())))
.when(jsRequestTemplate).send(argThat(jsQueueMsg -> jsQueueMsg.getValue().hasCompileRequest()));
}
} }

View File

@ -41,39 +41,34 @@ message RemoteJsResponse {
} }
message JsCompileRequest { message JsCompileRequest {
int64 scriptIdMSB = 1; string scriptHash = 1;
int64 scriptIdLSB = 2; string functionName = 2;
string functionName = 3; string scriptBody = 3;
string scriptBody = 4;
} }
message JsReleaseRequest { message JsReleaseRequest {
int64 scriptIdMSB = 1; string scriptHash = 1;
int64 scriptIdLSB = 2; string functionName = 2;
string functionName = 3;
} }
message JsReleaseResponse { message JsReleaseResponse {
bool success = 1; bool success = 1;
int64 scriptIdMSB = 2; string scriptHash = 2;
int64 scriptIdLSB = 3;
} }
message JsCompileResponse { message JsCompileResponse {
bool success = 1; bool success = 1;
int64 scriptIdMSB = 2; string scriptHash = 2;
int64 scriptIdLSB = 3; JsInvokeErrorCode errorCode = 3;
JsInvokeErrorCode errorCode = 4; string errorDetails = 4;
string errorDetails = 5;
} }
message JsInvokeRequest { message JsInvokeRequest {
int64 scriptIdMSB = 1; string scriptHash = 1;
int64 scriptIdLSB = 2; string functionName = 2;
string functionName = 3; string scriptBody = 3;
string scriptBody = 4; int32 timeout = 4;
int32 timeout = 5; repeated string args = 5;
repeated string args = 6;
} }
message JsInvokeResponse { message JsInvokeResponse {
@ -82,4 +77,3 @@ message JsInvokeResponse {
JsInvokeErrorCode errorCode = 3; JsInvokeErrorCode errorCode = 3;
string errorDetails = 4; string errorDetails = 4;
} }

View File

@ -16,8 +16,7 @@
export interface TbMessage { export interface TbMessage {
scriptIdMSB: string; scriptHash: string;
scriptIdLSB: string;
} }
export interface RemoteJsRequest { export interface RemoteJsRequest {

View File

@ -130,7 +130,12 @@ export class JsInvokeMessageProcessor {
processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) { processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) {
const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest); const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest);
this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
if (this.scriptMap.has(scriptId)) {
const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true);
this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId);
this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
return;
}
this.executor.compileScript(compileRequest.scriptBody).then( this.executor.compileScript(compileRequest.scriptBody).then(
(script) => { (script) => {
this.cacheScript(scriptId, script); this.cacheScript(scriptId, script);
@ -227,7 +232,7 @@ export class JsInvokeMessageProcessor {
const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId);
this.producer.send(responseTopic, scriptId, rawResponse, headers).then( this.producer.send(responseTopic, requestId, rawResponse, headers).then(
() => { () => {
this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);
}, },
@ -296,13 +301,11 @@ export class JsInvokeMessageProcessor {
} }
private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse { private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse {
const scriptIdBits = UUIDToBits(scriptId);
return { return {
errorCode: errorCode, errorCode: errorCode,
success: success, success: success,
errorDetails: parseJsErrorDetails(err), errorDetails: parseJsErrorDetails(err),
scriptIdMSB: scriptIdBits[0], scriptHash: scriptId,
scriptIdLSB: scriptIdBits[1]
}; };
} }
@ -316,16 +319,14 @@ export class JsInvokeMessageProcessor {
} }
private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse { private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse {
const scriptIdBits = UUIDToBits(scriptId);
return { return {
success: success, success: success,
scriptIdMSB: scriptIdBits[0], scriptHash: scriptId,
scriptIdLSB: scriptIdBits[1]
}; };
} }
private static getScriptId(request: TbMessage): string { private static getScriptId(request: TbMessage): string {
return toUUIDString(request.scriptIdMSB, request.scriptIdLSB); return request.scriptHash;
} }
private incrementUseScriptId(scriptId: string) { private incrementUseScriptId(scriptId: string) {

View File

@ -123,10 +123,10 @@ export class AwsSqsTemplate implements IQueue {
this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval); this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval);
} }
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
let msgBody = JSON.stringify( let msgBody = JSON.stringify(
{ {
key: scriptId, key: msgKey,
data: [...rawResponse], data: [...rawResponse],
headers: headers headers: headers
}); });

View File

@ -149,12 +149,11 @@ export class KafkaTemplate implements IQueue {
}); });
} }
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
this.logger.debug('Pending queue response, scriptId: [%s]', scriptId);
const message = { const message = {
topic: responseTopic, topic: responseTopic,
messages: [{ messages: [{
key: scriptId, key: msgKey,
value: rawResponse, value: rawResponse,
headers: headers.data headers: headers.data
}] }]

View File

@ -80,7 +80,7 @@ export class PubSubTemplate implements IQueue {
subscription.on('message', messageHandler); subscription.on('message', messageHandler);
} }
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) { if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) {
await this.createTopic(this.requestTopic); await this.createTopic(this.requestTopic);
await this.createSubscription(this.requestTopic); await this.createSubscription(this.requestTopic);
@ -88,7 +88,7 @@ export class PubSubTemplate implements IQueue {
let data = JSON.stringify( let data = JSON.stringify(
{ {
key: scriptId, key: msgKey,
data: [...rawResponse], data: [...rawResponse],
headers: headers headers: headers
}); });

View File

@ -17,6 +17,6 @@
export interface IQueue { export interface IQueue {
name: string; name: string;
init(): Promise<void>; init(): Promise<void>;
send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any>; send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any>;
destroy(): Promise<void>; destroy(): Promise<void>;
} }

View File

@ -65,7 +65,7 @@ export class RabbitMqTemplate implements IQueue {
}) })
} }
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
if (!this.topics.includes(responseTopic)) { if (!this.topics.includes(responseTopic)) {
await this.createQueue(responseTopic); await this.createQueue(responseTopic);
@ -74,7 +74,7 @@ export class RabbitMqTemplate implements IQueue {
let data = JSON.stringify( let data = JSON.stringify(
{ {
key: scriptId, key: msgKey,
data: [...rawResponse], data: [...rawResponse],
headers: headers headers: headers
}); });

View File

@ -82,7 +82,7 @@ export class ServiceBusTemplate implements IQueue {
this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
} }
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
if (!this.queues.includes(this.requestTopic)) { if (!this.queues.includes(this.requestTopic)) {
await this.createQueueIfNotExist(this.requestTopic); await this.createQueueIfNotExist(this.requestTopic);
this.queues.push(this.requestTopic); this.queues.push(this.requestTopic);
@ -96,7 +96,7 @@ export class ServiceBusTemplate implements IQueue {
} }
let data = { let data = {
key: scriptId, key: msgKey,
data: [...rawResponse], data: [...rawResponse],
headers: headers headers: headers
}; };