Do not send script body to remote JS executor on each invoke request

This commit is contained in:
ViacheslavKlimov 2022-10-06 19:29:26 +03:00
parent 1323edf5aa
commit 7aab188d0c
4 changed files with 216 additions and 59 deletions

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -28,6 +29,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.js.JsInvokeProtos.JsInvokeErrorCode;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -40,6 +42,7 @@ import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -98,9 +101,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
}
@Autowired
private TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
protected TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> requestTemplate;
private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
@ -129,25 +132,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.build();
log.trace("Post compile request for scriptId [{}]", scriptId);
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
if (maxEvalRequestsTimeout > 0) {
future = Futures.withTimeout(future, maxEvalRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
}
queuePushedMsgs.incrementAndGet();
Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
@Override
public void onSuccess(@Nullable TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse> result) {
queueEvalMsgs.incrementAndGet();
}
@Override
public void onFailure(Throwable t) {
if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
queueTimeoutMsgs.incrementAndGet();
}
queueFailedMsgs.incrementAndGet();
}
}, callbackExecutor);
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendRequest(jsRequestWrapper, maxEvalRequestsTimeout, queueEvalMsgs);
return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse();
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
@ -169,33 +154,66 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
if (scriptBody == null) {
return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
}
JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
.setScriptIdMSB(scriptId.getMostSignificantBits())
.setScriptIdLSB(scriptId.getLeastSignificantBits())
.setFunctionName(functionName)
.setTimeout((int) maxExecRequestsTimeout)
.setScriptBody(scriptBody);
for (Object arg : args) {
jsRequestBuilder.addArgs(arg.toString());
}
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
.setInvokeRequest(jsRequestBuilder.build())
.build();
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = buildJsInvokeRequest(scriptId, functionName, args, false, null);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = sendRequest(jsRequestWrapper, maxRequestsTimeout, queueInvokeMsgs);
return Futures.transformAsync(future, response -> {
stopWatch.stop();
log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
if (invokeResult.getSuccess()) {
return Futures.immediateFuture(invokeResult.getResult());
} else {
return handleInvokeError(scriptId, invokeResult.getErrorCode(), invokeResult.getErrorDetails(), functionName, args, scriptBody);
}
}, callbackExecutor);
}
private ListenableFuture<Object> handleInvokeError(UUID scriptId, JsInvokeErrorCode errorCode, String errorDetails,
String functionName, Object[] args, String scriptBody) {
log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, errorCode.name(), errorDetails);
RuntimeException e = new RuntimeException(errorDetails);
if (JsInvokeErrorCode.TIMEOUT_ERROR.equals(errorCode)) {
onScriptExecutionError(scriptId, e, scriptBody);
queueTimeoutMsgs.incrementAndGet();
} else if (JsInvokeErrorCode.COMPILATION_ERROR.equals(errorCode)) {
onScriptExecutionError(scriptId, e, scriptBody);
} else if (JsInvokeErrorCode.NOT_FOUND_ERROR.equals(errorCode)) {
log.debug("[{}] Remote JS executor couldn't find the script", scriptId);
if (scriptBody != null) {
JsInvokeProtos.RemoteJsRequest invokeRequestWithScriptBody = buildJsInvokeRequest(scriptId, functionName, args, true, scriptBody);
log.debug("[{}] Sending invoke request again with script body", scriptId);
return Futures.transformAsync(sendJsRequest(invokeRequestWithScriptBody, maxRequestsTimeout, queueInvokeMsgs, MoreExecutors.directExecutor()), r -> {
JsInvokeProtos.JsInvokeResponse result = r.getValue().getInvokeResponse();
if (result.getSuccess()) {
return Futures.immediateFuture(result.getResult());
} else {
return handleInvokeError(scriptId, result.getErrorCode(), result.getErrorDetails(), functionName, args, null);
}
}, MoreExecutors.directExecutor());
}
}
queueFailedMsgs.incrementAndGet();
return Futures.immediateFailedFuture(e);
}
private ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> sendRequest(JsInvokeProtos.RemoteJsRequest jsRequestWrapper, long maxRequestsTimeout, AtomicInteger msgsCounter) {
return sendJsRequest(jsRequestWrapper, maxRequestsTimeout, msgsCounter, callbackExecutor);
}
private ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> sendJsRequest(JsInvokeProtos.RemoteJsRequest jsRequestWrapper, long maxRequestsTimeout, AtomicInteger msgsCounter, Executor callbackExecutor) {
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
if (maxRequestsTimeout > 0) {
future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
}
queuePushedMsgs.incrementAndGet();
Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse> result) {
queueInvokeMsgs.incrementAndGet();
msgsCounter.incrementAndGet();
}
@Override
@ -206,25 +224,24 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
queueFailedMsgs.incrementAndGet();
}
}, callbackExecutor);
return Futures.transform(future, response -> {
stopWatch.stop();
log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
if (invokeResult.getSuccess()) {
return invokeResult.getResult();
} else {
final RuntimeException e = new RuntimeException(invokeResult.getErrorDetails());
if (JsInvokeProtos.JsInvokeErrorCode.TIMEOUT_ERROR.equals(invokeResult.getErrorCode())) {
onScriptExecutionError(scriptId, e, scriptBody);
queueTimeoutMsgs.incrementAndGet();
} else if (JsInvokeProtos.JsInvokeErrorCode.COMPILATION_ERROR.equals(invokeResult.getErrorCode())) {
onScriptExecutionError(scriptId, e, scriptBody);
return future;
}
queueFailedMsgs.incrementAndGet();
log.debug("[{}] Failed to invoke function due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
throw e;
private JsInvokeProtos.RemoteJsRequest buildJsInvokeRequest(UUID scriptId, String functionName, Object[] args, boolean includeScriptBody, String scriptBody) {
JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
.setScriptIdMSB(scriptId.getMostSignificantBits())
.setScriptIdLSB(scriptId.getLeastSignificantBits())
.setFunctionName(functionName)
.setTimeout((int) maxExecRequestsTimeout);
if (includeScriptBody) jsRequestBuilder.setScriptBody(scriptBody);
for (Object arg : args) {
jsRequestBuilder.addArgs(arg.toString());
}
}, callbackExecutor);
JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
.setInvokeRequest(jsRequestBuilder.build())
.build();
return jsRequestWrapper;
}
@Override

View File

@ -0,0 +1,128 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.Futures;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
class RemoteJsInvokeServiceTest {
private RemoteJsInvokeService remoteJsInvokeService;
private TbQueueRequestTemplate<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> jsRequestTemplate;
@BeforeEach
public void beforeEach() {
remoteJsInvokeService = new RemoteJsInvokeService(null, null);
jsRequestTemplate = mock(TbQueueRequestTemplate.class);
remoteJsInvokeService.requestTemplate = jsRequestTemplate;
}
@AfterEach
public void afterEach() {
reset(jsRequestTemplate);
}
@Test
public void whenInvokingFunction_thenDoNotSendScriptBody() throws Exception {
UUID scriptId = UUID.randomUUID();
remoteJsInvokeService.scriptIdToBodysMap.put(scriptId, "scriptscriptscript");
String expectedInvocationResult = "scriptInvocationResult";
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
.setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
.setSuccess(true)
.setResult(expectedInvocationResult)
.build())
.build())))
.when(jsRequestTemplate).send(any());
ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
Object invocationResult = remoteJsInvokeService.doInvokeFunction(scriptId, "f", new Object[]{"a"}).get();
verify(jsRequestTemplate).send(jsRequestCaptor.capture());
JsInvokeProtos.JsInvokeRequest jsInvokeRequestMade = jsRequestCaptor.getValue().getValue().getInvokeRequest();
assertThat(jsInvokeRequestMade.getScriptIdLSB()).isEqualTo(scriptId.getLeastSignificantBits());
assertThat(jsInvokeRequestMade.getScriptIdMSB()).isEqualTo(scriptId.getMostSignificantBits());
assertThat(jsInvokeRequestMade.getScriptBody()).isNullOrEmpty();
assertThat(invocationResult).isEqualTo(expectedInvocationResult);
}
@Test
public void whenInvokingFunctionAndRemoteJsExecutorRemovedScript_thenHandleNotFoundErrorAndMakeInvokeRequestWithScriptBody() throws Exception {
UUID scriptId = UUID.randomUUID();
String scriptBody = "scriptscriptscript";
remoteJsInvokeService.scriptIdToBodysMap.put(scriptId, scriptBody);
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
.setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
.setSuccess(false)
.setErrorCode(JsInvokeProtos.JsInvokeErrorCode.NOT_FOUND_ERROR)
.build())
.build())))
.when(jsRequestTemplate).send(argThat(jsQueueMsg -> {
return StringUtils.isEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody());
}));
String expectedInvocationResult = "invocationResult";
doReturn(Futures.immediateFuture(new TbProtoJsQueueMsg<>(UUID.randomUUID(), RemoteJsResponse.newBuilder()
.setInvokeResponse(JsInvokeProtos.JsInvokeResponse.newBuilder()
.setSuccess(true)
.setResult(expectedInvocationResult)
.build())
.build())))
.when(jsRequestTemplate).send(argThat(jsQueueMsg -> {
return StringUtils.isNotEmpty(jsQueueMsg.getValue().getInvokeRequest().getScriptBody());
}));
ArgumentCaptor<TbProtoJsQueueMsg<RemoteJsRequest>> jsRequestsCaptor = ArgumentCaptor.forClass(TbProtoJsQueueMsg.class);
Object invocationResult = remoteJsInvokeService.doInvokeFunction(scriptId, "f", new Object[]{"a"}).get();
verify(jsRequestTemplate, times(2)).send(jsRequestsCaptor.capture());
List<TbProtoJsQueueMsg<RemoteJsRequest>> jsInvokeRequestsMade = jsRequestsCaptor.getAllValues();
JsInvokeProtos.JsInvokeRequest firstRequestMade = jsInvokeRequestsMade.get(0).getValue().getInvokeRequest();
assertThat(firstRequestMade.getScriptBody()).isNullOrEmpty();
JsInvokeProtos.JsInvokeRequest secondRequestMade = jsInvokeRequestsMade.get(1).getValue().getInvokeRequest();
assertThat(secondRequestMade.getScriptBody()).isEqualTo(scriptBody);
assertThat(invocationResult).isEqualTo(expectedInvocationResult);
}
}

View File

@ -23,6 +23,7 @@ enum JsInvokeErrorCode {
COMPILATION_ERROR = 0;
RUNTIME_ERROR = 1;
TIMEOUT_ERROR = 2;
NOT_FOUND_ERROR = 3;
}
message RemoteJsRequest {

View File

@ -36,6 +36,7 @@ import Long from 'long';
const COMPILATION_ERROR = 0;
const RUNTIME_ERROR = 1;
const TIMEOUT_ERROR = 2;
const NOT_FOUND_ERROR = 3;
const statFrequency = Number(config.get('script.stat_print_frequency'));
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency'));
@ -182,8 +183,12 @@ export class JsInvokeMessageProcessor {
)
},
(err: any) => {
const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, COMPILATION_ERROR, err);
this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR);
let errorCode = COMPILATION_ERROR;
if (err && isString(err.name) && err.name.includes('script body not found')) {
errorCode = NOT_FOUND_ERROR;
}
const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err);
this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse);
}
);
@ -231,7 +236,7 @@ export class JsInvokeMessageProcessor {
if (script) {
self.incrementUseScriptId(scriptId);
resolve(script);
} else {
} else if (scriptBody) {
const startTime = performance.now();
self.executor.compileScript(scriptBody).then(
(compiledScript) => {
@ -244,6 +249,12 @@ export class JsInvokeMessageProcessor {
reject(err);
}
);
} else {
const err = {
name: 'script body not found',
message: ''
}
reject(err);
}
});
}