Merge with master
This commit is contained in:
commit
5633aeaa6e
@ -37,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
|||||||
"js.max_result_size=50",
|
"js.max_result_size=50",
|
||||||
"js.local.max_errors=2"
|
"js.local.max_errors=2"
|
||||||
})
|
})
|
||||||
class ScriptInvokeServiceTest extends AbstractControllerTest {
|
class LocalJsInvokeServiceTest extends AbstractControllerTest {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private NashornJsInvokeService jsInvokeService;
|
private NashornJsInvokeService jsInvokeService;
|
||||||
@ -0,0 +1,221 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.script;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.thingsboard.script.api.ScriptType;
|
||||||
|
import org.thingsboard.server.common.data.ApiUsageState;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||||
|
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
||||||
|
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||||
|
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
|
||||||
|
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
|
||||||
|
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
||||||
|
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
|
||||||
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.reset;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
class RemoteJsInvokeServiceTest {
|
||||||
|
|
||||||
|
private RemoteJsInvokeService remoteJsInvokeService;
|
||||||
|
private TbQueueRequestTemplate<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> jsRequestTemplate;
|
||||||
|
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void beforeEach() {
|
||||||
|
TbApiUsageStateClient apiUsageStateClient = mock(TbApiUsageStateClient.class);
|
||||||
|
ApiUsageState apiUsageState = mock(ApiUsageState.class);
|
||||||
|
when(apiUsageState.isJsExecEnabled()).thenReturn(true);
|
||||||
|
when(apiUsageStateClient.getApiUsageState(any())).thenReturn(apiUsageState);
|
||||||
|
TbApiUsageReportClient apiUsageReportClient = mock(TbApiUsageReportClient.class);
|
||||||
|
|
||||||
|
remoteJsInvokeService = new RemoteJsInvokeService(Optional.of(apiUsageStateClient), Optional.of(apiUsageReportClient));
|
||||||
|
jsRequestTemplate = mock(TbQueueRequestTemplate.class);
|
||||||
|
remoteJsInvokeService.requestTemplate = jsRequestTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void afterEach() {
|
||||||
|
reset(jsRequestTemplate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception {
|
||||||
|
mockJsEvalResponse();
|
||||||
|
String scriptBody = "return { a: 'b'};";
|
||||||
|
UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get();
|
||||||
|
reset(jsRequestTemplate);
|
||||||
|
|
||||||
|
String expectedInvocationResult = "scriptInvocationResult";
|
||||||
|
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
|
||||||
|
.setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
|
||||||
|
.setSuccess(true)
|
||||||
|
.setResult(expectedInvocationResult)
|
||||||
|
.build())
|
||||||
|
.build())))
|
||||||
|
.when(jsRequestTemplate).send(any());
|
||||||
|
|
||||||
|
ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
|
||||||
|
Object invocationResult = remoteJsInvokeService.invokeScript(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get();
|
||||||
|
verify(jsRequestTemplate).send(jsRequestCaptor.capture());
|
||||||
|
|
||||||
|
JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest();
|
||||||
|
assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty();
|
||||||
|
assertThat(jsInvokeRequestMade.getScriptHash()).isEqualTo(getScriptHash(scriptId));
|
||||||
|
assertThat(invocationResult).isEqualTo(expectedInvocationResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception {
|
||||||
|
mockJsEvalResponse();
|
||||||
|
String scriptBody = "return { a: 'b'};";
|
||||||
|
UUID scriptId = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get();
|
||||||
|
reset(jsRequestTemplate);
|
||||||
|
|
||||||
|
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
|
||||||
|
.setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
|
||||||
|
.setSuccess(false)
|
||||||
|
.setErrorCode(JsInvokeProtos.JsInvokeErrorCode.NOT_FOUND_ERROR)
|
||||||
|
.build())
|
||||||
|
.build())))
|
||||||
|
.when(jsRequestTemplate).send(argThat(jsQueueMsg -> {
|
||||||
|
return StringUtils.isEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody());
|
||||||
|
}));
|
||||||
|
|
||||||
|
String expectedInvocationResult = "invocationResult";
|
||||||
|
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
|
||||||
|
.setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
|
||||||
|
.setSuccess(true)
|
||||||
|
.setResult(expectedInvocationResult)
|
||||||
|
.build())
|
||||||
|
.build())))
|
||||||
|
.when(jsRequestTemplate).send(argThat(jsQueueMsg -> {
|
||||||
|
return StringUtils.isNotEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody());
|
||||||
|
}));
|
||||||
|
|
||||||
|
ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
|
||||||
|
Object invocationResult = remoteJsInvokeService.invokeScript(TenantId.SYS_TENANT_ID, null, scriptId, "{}").get();
|
||||||
|
verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture());
|
||||||
|
|
||||||
|
List<TbProtoJsQueueMsg<RemoteJsRequest>> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues();
|
||||||
|
|
||||||
|
JsInvokeProtos.JsInvokeRequest firstRequestMade = jsInvokeRequestsMade.get(0).getValue().getInvokeRequest();
|
||||||
|
assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty();
|
||||||
|
|
||||||
|
JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest();
|
||||||
|
assertThat(secondRequestMade.getScriptBody()).contains(scriptBody);
|
||||||
|
|
||||||
|
assertThat(jsInvokeRequestsMade.stream().map(TbProtoQueueMsg::getKey).distinct().count()).as("partition keys are same")
|
||||||
|
.isOne();
|
||||||
|
|
||||||
|
assertThat(invocationResult).isEqualTo(expectedInvocationResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenDoingEval_thenSaveScriptByHashOfTenantIdAndScriptBody() throws Exception {
|
||||||
|
mockJsEvalResponse();
|
||||||
|
|
||||||
|
TenantId tenantId1 = TenantId.fromUUID(UUID.randomUUID());
|
||||||
|
String scriptBody1 = "var msg = { temp: 42, humidity: 77 };\n" +
|
||||||
|
"var metadata = { data: 40 };\n" +
|
||||||
|
"var msgType = \"POST_TELEMETRY_REQUEST\";\n" +
|
||||||
|
"\n" +
|
||||||
|
"return { msg: msg, metadata: metadata, msgType: msgType };";
|
||||||
|
|
||||||
|
Set<String> scriptHashes = new HashSet<>();
|
||||||
|
String tenant1Script1Hash = null;
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
UUID scriptUuid = remoteJsInvokeService.eval(tenantId1, ScriptType.RULE_NODE_SCRIPT, scriptBody1).get();
|
||||||
|
tenant1Script1Hash = getScriptHash(scriptUuid);
|
||||||
|
scriptHashes.add(tenant1Script1Hash);
|
||||||
|
}
|
||||||
|
assertThat(scriptHashes).as("Unique scripts ids").size().isOne();
|
||||||
|
|
||||||
|
TenantId tenantId2 = TenantId.fromUUID(UUID.randomUUID());
|
||||||
|
UUID scriptUuid = remoteJsInvokeService.eval(tenantId2, ScriptType.RULE_NODE_SCRIPT, scriptBody1).get();
|
||||||
|
String tenant2Script1Id = getScriptHash(scriptUuid);
|
||||||
|
assertThat(tenant2Script1Id).isNotEqualTo(tenant1Script1Hash);
|
||||||
|
|
||||||
|
String scriptBody2 = scriptBody1 + ";;";
|
||||||
|
scriptUuid = remoteJsInvokeService.eval(tenantId2, ScriptType.RULE_NODE_SCRIPT, scriptBody2).get();
|
||||||
|
String tenant2Script2Id = getScriptHash(scriptUuid);
|
||||||
|
assertThat(tenant2Script2Id).isNotEqualTo(tenant2Script1Id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenReleasingScript_thenCheckForHashUsages() throws Exception {
|
||||||
|
mockJsEvalResponse();
|
||||||
|
String scriptBody = "return { a: 'b'};";
|
||||||
|
UUID scriptId1 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get();
|
||||||
|
UUID scriptId2 = remoteJsInvokeService.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptBody).get();
|
||||||
|
String scriptHash = getScriptHash(scriptId1);
|
||||||
|
assertThat(scriptHash).isEqualTo(getScriptHash(scriptId2));
|
||||||
|
reset(jsRequestTemplate);
|
||||||
|
|
||||||
|
doReturn(Futures.immediateFuture(new TbProtoQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
|
||||||
|
.setReleaseResponse(JsInvokeProtos.JsReleaseResponse.newBuilder()
|
||||||
|
.setSuccess(true)
|
||||||
|
.build())
|
||||||
|
.build())))
|
||||||
|
.when(jsRequestTemplate).send(any());
|
||||||
|
|
||||||
|
remoteJsInvokeService.release(scriptId1).get();
|
||||||
|
verifyNoInteractions(jsRequestTemplate);
|
||||||
|
assertThat(remoteJsInvokeService.scriptHashToBodysMap).containsKey(scriptHash);
|
||||||
|
|
||||||
|
remoteJsInvokeService.release(scriptId2).get();
|
||||||
|
verify(jsRequestTemplate).send(any());
|
||||||
|
assertThat(remoteJsInvokeService.scriptHashToBodysMap).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getScriptHash(UUID scriptUuid) {
|
||||||
|
return remoteJsInvokeService.getScriptHash(scriptUuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mockJsEvalResponse() {
|
||||||
|
doAnswer(methodCall -> Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
|
||||||
|
.setCompileResponse(JsInvokeProtos.JsCompileResponse.newBuilder()
|
||||||
|
.setSuccess(true)
|
||||||
|
.setScriptHash(methodCall.<TbProtoQueueMsg<RemoteJsRequest>>getArgument(0).getValue().getCompileRequest().getScriptHash())
|
||||||
|
.build())
|
||||||
|
.build())))
|
||||||
|
.when(jsRequestTemplate).send(argThat(jsQueueMsg -> jsQueueMsg.getValue().hasCompileRequest()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -23,6 +23,7 @@ enum JsInvokeErrorCode {
|
|||||||
COMPILATION_ERROR = 0;
|
COMPILATION_ERROR = 0;
|
||||||
RUNTIME_ERROR = 1;
|
RUNTIME_ERROR = 1;
|
||||||
TIMEOUT_ERROR = 2;
|
TIMEOUT_ERROR = 2;
|
||||||
|
NOT_FOUND_ERROR = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RemoteJsRequest {
|
message RemoteJsRequest {
|
||||||
@ -40,39 +41,34 @@ message RemoteJsResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message JsCompileRequest {
|
message JsCompileRequest {
|
||||||
int64 scriptIdMSB = 1;
|
|
||||||
int64 scriptIdLSB = 2;
|
|
||||||
string functionName = 3;
|
string functionName = 3;
|
||||||
string scriptBody = 4;
|
string scriptBody = 4;
|
||||||
|
string scriptHash = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message JsReleaseRequest {
|
message JsReleaseRequest {
|
||||||
int64 scriptIdMSB = 1;
|
|
||||||
int64 scriptIdLSB = 2;
|
|
||||||
string functionName = 3;
|
string functionName = 3;
|
||||||
|
string scriptHash = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message JsReleaseResponse {
|
message JsReleaseResponse {
|
||||||
bool success = 1;
|
bool success = 1;
|
||||||
int64 scriptIdMSB = 2;
|
string scriptHash = 4;
|
||||||
int64 scriptIdLSB = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message JsCompileResponse {
|
message JsCompileResponse {
|
||||||
bool success = 1;
|
bool success = 1;
|
||||||
int64 scriptIdMSB = 2;
|
|
||||||
int64 scriptIdLSB = 3;
|
|
||||||
JsInvokeErrorCode errorCode = 4;
|
JsInvokeErrorCode errorCode = 4;
|
||||||
string errorDetails = 5;
|
string errorDetails = 5;
|
||||||
|
string scriptHash = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
message JsInvokeRequest {
|
message JsInvokeRequest {
|
||||||
int64 scriptIdMSB = 1;
|
|
||||||
int64 scriptIdLSB = 2;
|
|
||||||
string functionName = 3;
|
string functionName = 3;
|
||||||
string scriptBody = 4;
|
string scriptBody = 4;
|
||||||
int32 timeout = 5;
|
int32 timeout = 5;
|
||||||
repeated string args = 6;
|
repeated string args = 6;
|
||||||
|
string scriptHash = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message JsInvokeResponse {
|
message JsInvokeResponse {
|
||||||
@ -81,4 +77,3 @@ message JsInvokeResponse {
|
|||||||
JsInvokeErrorCode errorCode = 3;
|
JsInvokeErrorCode errorCode = 3;
|
||||||
string errorDetails = 4;
|
string errorDetails = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -17,8 +17,10 @@ package org.thingsboard.server.service.script;
|
|||||||
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||||
@ -28,6 +30,7 @@ import org.springframework.util.StopWatch;
|
|||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.script.api.TbScriptException;
|
import org.thingsboard.script.api.TbScriptException;
|
||||||
import org.thingsboard.script.api.js.AbstractJsInvokeService;
|
import org.thingsboard.script.api.js.AbstractJsInvokeService;
|
||||||
|
import org.thingsboard.script.api.js.JsScriptInfo;
|
||||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||||
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
||||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||||
@ -46,6 +49,8 @@ import java.util.concurrent.ExecutorService;
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')")
|
@ConditionalOnExpression("'${js.evaluator:null}'=='remote' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine')")
|
||||||
@ -98,9 +103,10 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
|
protected TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
|
||||||
|
|
||||||
private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
|
protected final Map<String, String> scriptHashToBodysMap = new ConcurrentHashMap<>();
|
||||||
|
private final Lock scriptsLock = new ReentrantLock();
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
@ -117,53 +123,46 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody) {
|
protected ListenableFuture<UUID> doEval(UUID scriptId, JsScriptInfo jsInfo, String scriptBody) {
|
||||||
JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder()
|
JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder()
|
||||||
.setScriptIdMSB(scriptId.getMostSignificantBits())
|
.setScriptHash(jsInfo.getHash())
|
||||||
.setScriptIdLSB(scriptId.getLeastSignificantBits())
|
.setFunctionName(jsInfo.getFunctionName())
|
||||||
.setFunctionName(functionName)
|
|
||||||
.setScriptBody(scriptBody).build();
|
.setScriptBody(scriptBody).build();
|
||||||
|
|
||||||
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
|
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
|
||||||
.setCompileRequest(jsRequest)
|
.setCompileRequest(jsRequest)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
log.trace("Post compile request for scriptId [{}]", scriptId);
|
log.trace("Post compile request for scriptId [{}] (hash: {})", scriptId, jsInfo.getHash());
|
||||||
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
|
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
|
||||||
return Futures.transform(future, response -> {
|
return Futures.transform(future, response -> {
|
||||||
JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse();
|
JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse();
|
||||||
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
|
|
||||||
if (compilationResult.getSuccess()) {
|
if (compilationResult.getSuccess()) {
|
||||||
scriptIdToNameMap.put(scriptId, functionName);
|
scriptsLock.lock();
|
||||||
scriptIdToBodysMap.put(scriptId, scriptBody);
|
try {
|
||||||
return compiledScriptId;
|
scriptInfoMap.put(scriptId, jsInfo);
|
||||||
|
scriptHashToBodysMap.put(jsInfo.getHash(), scriptBody);
|
||||||
|
} finally {
|
||||||
|
scriptsLock.unlock();
|
||||||
|
}
|
||||||
|
return scriptId;
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
|
log.debug("[{}] (hash: {}) Failed to compile script due to [{}]: {}", scriptId, compilationResult.getScriptHash(),
|
||||||
|
compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
|
||||||
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, new RuntimeException(compilationResult.getErrorDetails()));
|
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, new RuntimeException(compilationResult.getErrorDetails()));
|
||||||
}
|
}
|
||||||
}, callbackExecutor);
|
}, callbackExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
|
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, JsScriptInfo jsInfo, Object[] args) {
|
||||||
final String scriptBody = scriptIdToBodysMap.get(scriptId);
|
var scriptHash = jsInfo.getHash();
|
||||||
|
String scriptBody = scriptHashToBodysMap.get(scriptHash);
|
||||||
if (scriptBody == null) {
|
if (scriptBody == null) {
|
||||||
return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
|
return Futures.immediateFailedFuture(new RuntimeException("No script body found for script hash [" + scriptHash + "] (script id: [" + scriptId + "])"));
|
||||||
}
|
|
||||||
JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
|
|
||||||
.setScriptIdMSB(scriptId.getMostSignificantBits())
|
|
||||||
.setScriptIdLSB(scriptId.getLeastSignificantBits())
|
|
||||||
.setFunctionName(functionName)
|
|
||||||
.setTimeout((int) maxExecRequestsTimeout)
|
|
||||||
.setScriptBody(scriptBody);
|
|
||||||
|
|
||||||
for (Object arg : args) {
|
|
||||||
jsRequestBuilder.addArgs(arg.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
|
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(jsInfo, args, false, null);
|
||||||
.setInvokeRequest(jsRequestBuilder.build())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
StopWatch stopWatch;
|
StopWatch stopWatch;
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
@ -173,35 +172,80 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
|
|||||||
stopWatch = null;
|
stopWatch = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
|
UUID requestKey = UUID.randomUUID();
|
||||||
return Futures.transform(future, response -> {
|
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, jsRequestWrapper));
|
||||||
|
return Futures.transformAsync(future, response -> {
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
|
log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
|
||||||
}
|
}
|
||||||
JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
|
JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
|
||||||
if (invokeResult.getSuccess()) {
|
if (invokeResult.getSuccess()) {
|
||||||
return invokeResult.getResult();
|
return Futures.immediateFuture(invokeResult.getResult());
|
||||||
} else {
|
} else {
|
||||||
final RuntimeException e = new RuntimeException(invokeResult.getErrorDetails());
|
return handleInvokeError(requestKey, scriptId, jsInfo, invokeResult.getErrorCode(), invokeResult.getErrorDetails(), scriptBody, args);
|
||||||
log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
|
|
||||||
if (JsInvokeProtos.JsInvokeErrorCode.TIMEOUT_ERROR.equals(invokeResult.getErrorCode())) {
|
|
||||||
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.TIMEOUT, scriptBody, new TimeoutException());
|
|
||||||
} else if (JsInvokeProtos.JsInvokeErrorCode.COMPILATION_ERROR.equals(invokeResult.getErrorCode())) {
|
|
||||||
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, e);
|
|
||||||
} else {
|
|
||||||
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, scriptBody, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}, callbackExecutor);
|
}, callbackExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(JsScriptInfo jsInfo, Object[] args, boolean includeScriptBody, String scriptBody) {
|
||||||
|
JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
|
||||||
|
.setScriptHash(jsInfo.getHash())
|
||||||
|
.setFunctionName(jsInfo.getFunctionName())
|
||||||
|
.setTimeout((int) maxExecRequestsTimeout);
|
||||||
|
if (includeScriptBody) {
|
||||||
|
jsRequestBuilder.setScriptBody(scriptBody);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Object arg : args) {
|
||||||
|
jsRequestBuilder.addArgs(arg.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
|
||||||
|
.setInvokeRequest(jsRequestBuilder.build())
|
||||||
|
.build();
|
||||||
|
return jsRequestWrapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ListenableFuture<Object> handleInvokeError(UUID requestKey, UUID scriptId, JsScriptInfo jsInfo,
|
||||||
|
JsInvokeProtos.JsInvokeErrorCode errorCode, String errorDetails,
|
||||||
|
String scriptBody, Object[] args) {
|
||||||
|
final RuntimeException e = new RuntimeException(errorDetails);
|
||||||
|
log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails);
|
||||||
|
if (JsInvokeProtos.JsInvokeErrorCode.TIMEOUT_ERROR.equals(errorCode)) {
|
||||||
|
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.TIMEOUT, scriptBody, new TimeoutException());
|
||||||
|
} else if (JsInvokeProtos.JsInvokeErrorCode.COMPILATION_ERROR.equals(errorCode)) {
|
||||||
|
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody, e);
|
||||||
|
} else if (JsInvokeProtos.JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) {
|
||||||
|
log.debug("[{}] Remote JS executor couldn't find the script", scriptId);
|
||||||
|
if (scriptBody != null) {
|
||||||
|
JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(jsInfo, args, true, scriptBody);
|
||||||
|
log.debug("[{}] Sending invoke request again with script body", scriptId);
|
||||||
|
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(requestKey, invokeRequestWithScriptBody));
|
||||||
|
return Futures.transformAsync(future, response -> {
|
||||||
|
JsInvokeProtos.JsInvokeResponse result = response.getValue().getInvokeResponse();
|
||||||
|
if (result.getSuccess()) {
|
||||||
|
return Futures.immediateFuture(result.getResult());
|
||||||
|
} else {
|
||||||
|
return handleInvokeError(requestKey, scriptId, jsInfo, result.getErrorCode(), result.getErrorDetails(), null, args);
|
||||||
|
}
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, scriptBody, e);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doRelease(UUID scriptId, String functionName) throws Exception {
|
protected void doRelease(UUID scriptId, JsScriptInfo jsInfo) throws Exception {
|
||||||
|
String scriptHash = jsInfo.getHash();
|
||||||
|
if (scriptInfoMap.values().stream().map(JsScriptInfo::getHash).anyMatch(hash -> hash.equals(scriptHash))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder()
|
JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder()
|
||||||
.setScriptIdMSB(scriptId.getMostSignificantBits())
|
.setScriptHash(scriptHash)
|
||||||
.setScriptIdLSB(scriptId.getLeastSignificantBits())
|
.setFunctionName(jsInfo.getFunctionName()).build();
|
||||||
.setFunctionName(functionName).build();
|
|
||||||
|
|
||||||
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
|
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
|
||||||
.setReleaseRequest(jsRequest)
|
.setReleaseRequest(jsRequest)
|
||||||
@ -213,13 +257,28 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
|
|||||||
}
|
}
|
||||||
JsInvokeProtos.RemoteJsResponse response = future.get().getValue();
|
JsInvokeProtos.RemoteJsResponse response = future.get().getValue();
|
||||||
|
|
||||||
JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse();
|
JsInvokeProtos.JsReleaseResponse releaseResponse = response.getReleaseResponse();
|
||||||
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
|
if (releaseResponse.getSuccess()) {
|
||||||
if (compilationResult.getSuccess()) {
|
scriptsLock.lock();
|
||||||
scriptIdToBodysMap.remove(scriptId);
|
try {
|
||||||
|
if (scriptInfoMap.values().stream().map(JsScriptInfo::getHash).noneMatch(hash -> hash.equals(scriptHash))) {
|
||||||
|
scriptHashToBodysMap.remove(scriptHash);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
scriptsLock.unlock();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Failed to release script due", compiledScriptId);
|
log.debug("[{}] Failed to release script", scriptHash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String constructFunctionName(UUID scriptId, String scriptHash) {
|
||||||
|
return "invokeInternal_" + scriptHash;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getScriptHash(UUID scriptId) {
|
||||||
|
JsScriptInfo jsScriptInfo = scriptInfoMap.get(scriptId);
|
||||||
|
return jsScriptInfo != null ? jsScriptInfo.getHash() : null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,13 +15,12 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.script.api;
|
package org.thingsboard.script.api;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.AsyncFunction;
|
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
import org.springframework.data.util.Pair;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||||
@ -46,7 +45,7 @@ import static java.lang.String.format;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractScriptInvokeService implements ScriptInvokeService {
|
public abstract class AbstractScriptInvokeService implements ScriptInvokeService {
|
||||||
|
|
||||||
protected Map<UUID, BlockedScriptInfo> disabledScripts = new ConcurrentHashMap<>();
|
protected final Map<UUID, BlockedScriptInfo> disabledScripts = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final Optional<TbApiUsageStateClient> apiUsageStateClient;
|
private final Optional<TbApiUsageStateClient> apiUsageStateClient;
|
||||||
private final Optional<TbApiUsageReportClient> apiUsageReportClient;
|
private final Optional<TbApiUsageReportClient> apiUsageReportClient;
|
||||||
@ -90,7 +89,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
|||||||
|
|
||||||
protected abstract boolean isScriptPresent(UUID scriptId);
|
protected abstract boolean isScriptPresent(UUID scriptId);
|
||||||
|
|
||||||
protected abstract ListenableFuture<UUID> doEvalScript(ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames);
|
protected abstract ListenableFuture<UUID> doEvalScript(TenantId tenantId, ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames);
|
||||||
|
|
||||||
protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, Object[] args);
|
protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, Object[] args);
|
||||||
|
|
||||||
@ -130,7 +129,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
|||||||
}
|
}
|
||||||
UUID scriptId = UUID.randomUUID();
|
UUID scriptId = UUID.randomUUID();
|
||||||
pushedMsgs.incrementAndGet();
|
pushedMsgs.incrementAndGet();
|
||||||
return withTimeoutAndStatsCallback(scriptId, doEvalScript(scriptType, scriptBody, scriptId, argNames), evalCallback, getMaxEvalRequestsTimeout());
|
return withTimeoutAndStatsCallback(scriptId, doEvalScript(tenantId, scriptType, scriptBody, scriptId, argNames), evalCallback, getMaxEvalRequestsTimeout());
|
||||||
} else {
|
} else {
|
||||||
return error("Script Execution is disabled due to API limits!");
|
return error("Script Execution is disabled due to API limits!");
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,14 +15,16 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.script.api.js;
|
package org.thingsboard.script.api.js;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.hash.Hashing;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.data.util.Pair;
|
||||||
import org.thingsboard.script.api.AbstractScriptInvokeService;
|
import org.thingsboard.script.api.AbstractScriptInvokeService;
|
||||||
import org.thingsboard.script.api.RuleNodeScriptFactory;
|
import org.thingsboard.script.api.RuleNodeScriptFactory;
|
||||||
import org.thingsboard.script.api.ScriptType;
|
import org.thingsboard.script.api.ScriptType;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||||
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
||||||
|
|
||||||
@ -35,9 +37,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
* Created by ashvayka on 26.09.18.
|
* Created by ashvayka on 26.09.18.
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractJsInvokeService extends AbstractScriptInvokeService implements JsInvokeService{
|
public abstract class AbstractJsInvokeService extends AbstractScriptInvokeService implements JsInvokeService {
|
||||||
|
|
||||||
protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
|
protected final Map<UUID, JsScriptInfo> scriptInfoMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@Value("${js.max_total_args_size:100000}")
|
@Value("${js.max_total_args_size:100000}")
|
||||||
@ -55,47 +57,32 @@ public abstract class AbstractJsInvokeService extends AbstractScriptInvokeServic
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isScriptPresent(UUID scriptId) {
|
protected boolean isScriptPresent(UUID scriptId) {
|
||||||
return scriptIdToNameMap.containsKey(scriptId);
|
return scriptInfoMap.containsKey(scriptId);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<Void> release(UUID scriptId) {
|
|
||||||
String functionName = scriptIdToNameMap.get(scriptId);
|
|
||||||
if (functionName != null) {
|
|
||||||
try {
|
|
||||||
scriptIdToNameMap.remove(scriptId);
|
|
||||||
disabledScripts.remove(scriptId);
|
|
||||||
doRelease(scriptId, functionName);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return Futures.immediateFailedFuture(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, Object[] args) {
|
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, Object[] args) {
|
||||||
return doInvokeFunction(scriptId, scriptIdToNameMap.get(scriptId), args);
|
return doInvokeFunction(scriptId, scriptInfoMap.get(scriptId), args);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ListenableFuture<UUID> doEvalScript(ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) {
|
protected ListenableFuture<UUID> doEvalScript(TenantId tenantId, ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) {
|
||||||
String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
|
String scriptHash = hash(tenantId, scriptBody);
|
||||||
|
String functionName = constructFunctionName(scriptId, scriptHash);
|
||||||
String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
|
String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
|
||||||
return doEval(scriptId, functionName, jsScript);
|
return doEval(scriptId, new JsScriptInfo(scriptHash, functionName), jsScript);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doRelease(UUID scriptId) throws Exception {
|
protected void doRelease(UUID scriptId) throws Exception {
|
||||||
String functionName = scriptIdToNameMap.remove(scriptId);
|
doRelease(scriptId, scriptInfoMap.remove(scriptId));
|
||||||
doRelease(scriptId, functionName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody);
|
protected abstract ListenableFuture<UUID> doEval(UUID scriptId, JsScriptInfo jsInfo, String scriptBody);
|
||||||
|
|
||||||
protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args);
|
protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, JsScriptInfo jsInfo, Object[] args);
|
||||||
|
|
||||||
protected abstract void doRelease(UUID scriptId, String functionName) throws Exception;
|
protected abstract void doRelease(UUID scriptId, JsScriptInfo scriptInfo) throws Exception;
|
||||||
|
|
||||||
private String generateJsScript(ScriptType scriptType, String functionName, String scriptBody, String... argNames) {
|
private String generateJsScript(ScriptType scriptType, String functionName, String scriptBody, String... argNames) {
|
||||||
if (scriptType == ScriptType.RULE_NODE_SCRIPT) {
|
if (scriptType == ScriptType.RULE_NODE_SCRIPT) {
|
||||||
@ -104,4 +91,16 @@ public abstract class AbstractJsInvokeService extends AbstractScriptInvokeServic
|
|||||||
throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
|
throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String constructFunctionName(UUID scriptId, String scriptHash) {
|
||||||
|
return "invokeInternal_" + scriptId.toString().replace('-', '_');
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String hash(TenantId tenantId, String scriptBody) {
|
||||||
|
return Hashing.murmur3_128().newHasher()
|
||||||
|
.putLong(tenantId.getId().getMostSignificantBits())
|
||||||
|
.putLong(tenantId.getId().getLeastSignificantBits())
|
||||||
|
.putUnencodedChars(scriptBody)
|
||||||
|
.hash().toString();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,26 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.script.api.js;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class JsScriptInfo {
|
||||||
|
|
||||||
|
private final String hash;
|
||||||
|
private final String functionName;
|
||||||
|
|
||||||
|
}
|
||||||
@ -130,7 +130,7 @@ public class NashornJsInvokeService extends AbstractJsInvokeService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String jsScript) {
|
protected ListenableFuture<UUID> doEval(UUID scriptId, JsScriptInfo scriptInfo, String jsScript) {
|
||||||
return jsExecutor.submit(() -> {
|
return jsExecutor.submit(() -> {
|
||||||
try {
|
try {
|
||||||
evalLock.lock();
|
evalLock.lock();
|
||||||
@ -143,7 +143,7 @@ public class NashornJsInvokeService extends AbstractJsInvokeService {
|
|||||||
} finally {
|
} finally {
|
||||||
evalLock.unlock();
|
evalLock.unlock();
|
||||||
}
|
}
|
||||||
scriptIdToNameMap.put(scriptId, functionName);
|
scriptInfoMap.put(scriptId, scriptInfo);
|
||||||
return scriptId;
|
return scriptId;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, jsScript, e);
|
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, jsScript, e);
|
||||||
@ -152,13 +152,13 @@ public class NashornJsInvokeService extends AbstractJsInvokeService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
|
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, JsScriptInfo scriptInfo, Object[] args) {
|
||||||
return jsExecutor.submit(() -> {
|
return jsExecutor.submit(() -> {
|
||||||
try {
|
try {
|
||||||
if (useJsSandbox) {
|
if (useJsSandbox) {
|
||||||
return sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
|
return sandbox.getSandboxedInvocable().invokeFunction(scriptInfo.getFunctionName(), args);
|
||||||
} else {
|
} else {
|
||||||
return ((Invocable) engine).invokeFunction(functionName, args);
|
return ((Invocable) engine).invokeFunction(scriptInfo.getFunctionName(), args);
|
||||||
}
|
}
|
||||||
} catch (ScriptException e) {
|
} catch (ScriptException e) {
|
||||||
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, null, e);
|
throw new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, null, e);
|
||||||
@ -168,11 +168,11 @@ public class NashornJsInvokeService extends AbstractJsInvokeService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void doRelease(UUID scriptId, String functionName) throws ScriptException {
|
protected void doRelease(UUID scriptId, JsScriptInfo scriptInfo) throws ScriptException {
|
||||||
if (useJsSandbox) {
|
if (useJsSandbox) {
|
||||||
sandbox.eval(functionName + " = undefined;");
|
sandbox.eval(scriptInfo.getFunctionName() + " = undefined;");
|
||||||
} else {
|
} else {
|
||||||
engine.eval(functionName + " = undefined;");
|
engine.eval(scriptInfo.getFunctionName() + " = undefined;");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -35,6 +35,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
|||||||
import org.thingsboard.script.api.AbstractScriptInvokeService;
|
import org.thingsboard.script.api.AbstractScriptInvokeService;
|
||||||
import org.thingsboard.script.api.ScriptType;
|
import org.thingsboard.script.api.ScriptType;
|
||||||
import org.thingsboard.script.api.TbScriptException;
|
import org.thingsboard.script.api.TbScriptException;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||||
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
||||||
|
|
||||||
@ -132,7 +133,7 @@ public class DefaultMvelInvokeService extends AbstractScriptInvokeService implem
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ListenableFuture<UUID> doEvalScript(ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) {
|
protected ListenableFuture<UUID> doEvalScript(TenantId tenantId, ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames) {
|
||||||
if (NEW_KEYWORD_PATTERN.matcher(scriptBody).matches()) {
|
if (NEW_KEYWORD_PATTERN.matcher(scriptBody).matches()) {
|
||||||
//TODO: output line number and char pos.
|
//TODO: output line number and char pos.
|
||||||
return Futures.immediateFailedFuture(new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody,
|
return Futures.immediateFailedFuture(new TbScriptException(scriptId, TbScriptException.ErrorCode.COMPILATION, scriptBody,
|
||||||
|
|||||||
@ -16,8 +16,9 @@
|
|||||||
|
|
||||||
|
|
||||||
export interface TbMessage {
|
export interface TbMessage {
|
||||||
scriptIdMSB: string;
|
scriptIdMSB: string; // deprecated
|
||||||
scriptIdLSB: string;
|
scriptIdLSB: string; // deprecated
|
||||||
|
scriptHash: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface RemoteJsRequest {
|
export interface RemoteJsRequest {
|
||||||
|
|||||||
@ -18,7 +18,7 @@ import config from 'config';
|
|||||||
import { _logger } from '../config/logger';
|
import { _logger } from '../config/logger';
|
||||||
import { JsExecutor, TbScript } from './jsExecutor';
|
import { JsExecutor, TbScript } from './jsExecutor';
|
||||||
import { performance } from 'perf_hooks';
|
import { performance } from 'perf_hooks';
|
||||||
import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits } from './utils';
|
import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits, isNotUUID } from './utils';
|
||||||
import { IQueue } from '../queue/queue.models';
|
import { IQueue } from '../queue/queue.models';
|
||||||
import {
|
import {
|
||||||
JsCompileRequest,
|
JsCompileRequest,
|
||||||
@ -36,6 +36,7 @@ import Long from 'long';
|
|||||||
const COMPILATION_ERROR = 0;
|
const COMPILATION_ERROR = 0;
|
||||||
const RUNTIME_ERROR = 1;
|
const RUNTIME_ERROR = 1;
|
||||||
const TIMEOUT_ERROR = 2;
|
const TIMEOUT_ERROR = 2;
|
||||||
|
const NOT_FOUND_ERROR = 3;
|
||||||
|
|
||||||
const statFrequency = Number(config.get('script.stat_print_frequency'));
|
const statFrequency = Number(config.get('script.stat_print_frequency'));
|
||||||
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency'));
|
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency'));
|
||||||
@ -129,7 +130,12 @@ export class JsInvokeMessageProcessor {
|
|||||||
processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) {
|
processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) {
|
||||||
const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest);
|
const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest);
|
||||||
this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
|
this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
|
||||||
|
if (this.scriptMap.has(scriptId)) {
|
||||||
|
const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true);
|
||||||
|
this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId);
|
||||||
|
this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
|
||||||
|
return;
|
||||||
|
}
|
||||||
this.executor.compileScript(compileRequest.scriptBody).then(
|
this.executor.compileScript(compileRequest.scriptBody).then(
|
||||||
(script) => {
|
(script) => {
|
||||||
this.cacheScript(scriptId, script);
|
this.cacheScript(scriptId, script);
|
||||||
@ -170,7 +176,7 @@ export class JsInvokeMessageProcessor {
|
|||||||
this.logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
|
this.logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
|
||||||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
|
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
|
||||||
} else {
|
} else {
|
||||||
let err = {
|
const err = {
|
||||||
name: 'Error',
|
name: 'Error',
|
||||||
message: 'script invocation result exceeds maximum allowed size of ' + maxResultSize + ' symbols'
|
message: 'script invocation result exceeds maximum allowed size of ' + maxResultSize + ' symbols'
|
||||||
}
|
}
|
||||||
@ -193,8 +199,12 @@ export class JsInvokeMessageProcessor {
|
|||||||
)
|
)
|
||||||
},
|
},
|
||||||
(err: any) => {
|
(err: any) => {
|
||||||
const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, COMPILATION_ERROR, err);
|
let errorCode = COMPILATION_ERROR;
|
||||||
this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR);
|
if (err?.name === 'script body not found') {
|
||||||
|
errorCode = NOT_FOUND_ERROR;
|
||||||
|
}
|
||||||
|
const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err);
|
||||||
|
this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
|
||||||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
|
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
@ -222,7 +232,7 @@ export class JsInvokeMessageProcessor {
|
|||||||
const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
|
const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
|
||||||
const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
|
const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
|
||||||
this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId);
|
this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId);
|
||||||
this.producer.send(responseTopic, scriptId, rawResponse, headers).then(
|
this.producer.send(responseTopic, requestId, rawResponse, headers).then(
|
||||||
() => {
|
() => {
|
||||||
this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);
|
this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);
|
||||||
},
|
},
|
||||||
@ -242,7 +252,7 @@ export class JsInvokeMessageProcessor {
|
|||||||
if (script) {
|
if (script) {
|
||||||
self.incrementUseScriptId(scriptId);
|
self.incrementUseScriptId(scriptId);
|
||||||
resolve(script);
|
resolve(script);
|
||||||
} else {
|
} else if (scriptBody) {
|
||||||
const startTime = performance.now();
|
const startTime = performance.now();
|
||||||
self.executor.compileScript(scriptBody).then(
|
self.executor.compileScript(scriptBody).then(
|
||||||
(compiledScript) => {
|
(compiledScript) => {
|
||||||
@ -255,6 +265,12 @@ export class JsInvokeMessageProcessor {
|
|||||||
reject(err);
|
reject(err);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
} else {
|
||||||
|
const err = {
|
||||||
|
name: 'script body not found',
|
||||||
|
message: ''
|
||||||
|
}
|
||||||
|
reject(err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -285,14 +301,26 @@ export class JsInvokeMessageProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse {
|
private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse {
|
||||||
const scriptIdBits = UUIDToBits(scriptId);
|
if (isNotUUID(scriptId)) {
|
||||||
return {
|
return {
|
||||||
errorCode: errorCode,
|
errorCode: errorCode,
|
||||||
success: success,
|
success: success,
|
||||||
errorDetails: parseJsErrorDetails(err),
|
errorDetails: parseJsErrorDetails(err),
|
||||||
scriptIdMSB: scriptIdBits[0],
|
scriptIdMSB: "0",
|
||||||
scriptIdLSB: scriptIdBits[1]
|
scriptIdLSB: "0",
|
||||||
};
|
scriptHash: scriptId
|
||||||
|
};
|
||||||
|
} else { // this is for backward compatibility (to be able to work with tb-node of previous version) - todo: remove in the next release
|
||||||
|
let scriptIdBits = UUIDToBits(scriptId);
|
||||||
|
return {
|
||||||
|
errorCode: errorCode,
|
||||||
|
success: success,
|
||||||
|
errorDetails: parseJsErrorDetails(err),
|
||||||
|
scriptIdMSB: scriptIdBits[0],
|
||||||
|
scriptIdLSB: scriptIdBits[1],
|
||||||
|
scriptHash: ""
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static createInvokeResponse(result: string, success: boolean, errorCode?: number, err?: any): JsInvokeResponse {
|
private static createInvokeResponse(result: string, success: boolean, errorCode?: number, err?: any): JsInvokeResponse {
|
||||||
@ -305,16 +333,26 @@ export class JsInvokeMessageProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse {
|
private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse {
|
||||||
const scriptIdBits = UUIDToBits(scriptId);
|
if (isNotUUID(scriptId)) {
|
||||||
return {
|
return {
|
||||||
success: success,
|
success: success,
|
||||||
scriptIdMSB: scriptIdBits[0],
|
scriptIdMSB: "0",
|
||||||
scriptIdLSB: scriptIdBits[1]
|
scriptIdLSB: "0",
|
||||||
};
|
scriptHash: scriptId,
|
||||||
|
};
|
||||||
|
} else { // todo: remove in the next release
|
||||||
|
let scriptIdBits = UUIDToBits(scriptId);
|
||||||
|
return {
|
||||||
|
success: success,
|
||||||
|
scriptIdMSB: scriptIdBits[0],
|
||||||
|
scriptIdLSB: scriptIdBits[1],
|
||||||
|
scriptHash: ""
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static getScriptId(request: TbMessage): string {
|
private static getScriptId(request: TbMessage): string {
|
||||||
return toUUIDString(request.scriptIdMSB, request.scriptIdLSB);
|
return request.scriptHash ? request.scriptHash : toUUIDString(request.scriptIdMSB, request.scriptIdLSB);
|
||||||
}
|
}
|
||||||
|
|
||||||
private incrementUseScriptId(scriptId: string) {
|
private incrementUseScriptId(scriptId: string) {
|
||||||
|
|||||||
@ -58,3 +58,7 @@ export function parseJsErrorDetails(err: any): string | undefined {
|
|||||||
}
|
}
|
||||||
return details;
|
return details;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isNotUUID(candidate: string) {
|
||||||
|
return candidate.length != 36 || !candidate.includes('-');
|
||||||
|
}
|
||||||
|
|||||||
@ -123,10 +123,10 @@ export class AwsSqsTemplate implements IQueue {
|
|||||||
this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval);
|
this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
|
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
|
||||||
let msgBody = JSON.stringify(
|
let msgBody = JSON.stringify(
|
||||||
{
|
{
|
||||||
key: scriptId,
|
key: msgKey,
|
||||||
data: [...rawResponse],
|
data: [...rawResponse],
|
||||||
headers: headers
|
headers: headers
|
||||||
});
|
});
|
||||||
|
|||||||
@ -149,12 +149,11 @@ export class KafkaTemplate implements IQueue {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
|
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
|
||||||
this.logger.debug('Pending queue response, scriptId: [%s]', scriptId);
|
|
||||||
const message = {
|
const message = {
|
||||||
topic: responseTopic,
|
topic: responseTopic,
|
||||||
messages: [{
|
messages: [{
|
||||||
key: scriptId,
|
key: msgKey,
|
||||||
value: rawResponse,
|
value: rawResponse,
|
||||||
headers: headers.data
|
headers: headers.data
|
||||||
}]
|
}]
|
||||||
|
|||||||
@ -80,7 +80,7 @@ export class PubSubTemplate implements IQueue {
|
|||||||
subscription.on('message', messageHandler);
|
subscription.on('message', messageHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
|
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
|
||||||
if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) {
|
if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) {
|
||||||
await this.createTopic(this.requestTopic);
|
await this.createTopic(this.requestTopic);
|
||||||
await this.createSubscription(this.requestTopic);
|
await this.createSubscription(this.requestTopic);
|
||||||
@ -88,7 +88,7 @@ export class PubSubTemplate implements IQueue {
|
|||||||
|
|
||||||
let data = JSON.stringify(
|
let data = JSON.stringify(
|
||||||
{
|
{
|
||||||
key: scriptId,
|
key: msgKey,
|
||||||
data: [...rawResponse],
|
data: [...rawResponse],
|
||||||
headers: headers
|
headers: headers
|
||||||
});
|
});
|
||||||
|
|||||||
@ -17,6 +17,6 @@
|
|||||||
export interface IQueue {
|
export interface IQueue {
|
||||||
name: string;
|
name: string;
|
||||||
init(): Promise<void>;
|
init(): Promise<void>;
|
||||||
send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any>;
|
send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any>;
|
||||||
destroy(): Promise<void>;
|
destroy(): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -65,7 +65,7 @@ export class RabbitMqTemplate implements IQueue {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
|
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
|
||||||
|
|
||||||
if (!this.topics.includes(responseTopic)) {
|
if (!this.topics.includes(responseTopic)) {
|
||||||
await this.createQueue(responseTopic);
|
await this.createQueue(responseTopic);
|
||||||
@ -74,7 +74,7 @@ export class RabbitMqTemplate implements IQueue {
|
|||||||
|
|
||||||
let data = JSON.stringify(
|
let data = JSON.stringify(
|
||||||
{
|
{
|
||||||
key: scriptId,
|
key: msgKey,
|
||||||
data: [...rawResponse],
|
data: [...rawResponse],
|
||||||
headers: headers
|
headers: headers
|
||||||
});
|
});
|
||||||
|
|||||||
@ -82,7 +82,7 @@ export class ServiceBusTemplate implements IQueue {
|
|||||||
this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
|
this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
|
||||||
}
|
}
|
||||||
|
|
||||||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
|
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
|
||||||
if (!this.queues.includes(this.requestTopic)) {
|
if (!this.queues.includes(this.requestTopic)) {
|
||||||
await this.createQueueIfNotExist(this.requestTopic);
|
await this.createQueueIfNotExist(this.requestTopic);
|
||||||
this.queues.push(this.requestTopic);
|
this.queues.push(this.requestTopic);
|
||||||
@ -96,7 +96,7 @@ export class ServiceBusTemplate implements IQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let data = {
|
let data = {
|
||||||
key: scriptId,
|
key: msgKey,
|
||||||
data: [...rawResponse],
|
data: [...rawResponse],
|
||||||
headers: headers
|
headers: headers
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user