js-script-engine-api: js sync calls replaced with completely async calls (CE API only)

This commit is contained in:
Sergey Matvienko 2021-06-16 20:49:46 +03:00
parent f3757ad127
commit b397dfb518
7 changed files with 83 additions and 129 deletions

View File

@ -390,13 +390,13 @@ public class RuleChainController extends BaseController {
TbMsg inMsg = TbMsg.newMsg(msgType, null, new TbMsgMetaData(metadata), TbMsgDataType.JSON, data);
switch (scriptType) {
case "update":
output = msgToOutput(engine.executeUpdate(inMsg));
output = msgToOutput(engine.executeUpdateAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS));
break;
case "generate":
output = msgToOutput(engine.executeGenerateAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS));
break;
case "filter":
boolean result = engine.executeFilter(inMsg);
boolean result = engine.executeFilterAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS);
output = Boolean.toString(result);
break;
case "switch":
@ -404,11 +404,11 @@ public class RuleChainController extends BaseController {
output = objectMapper.writeValueAsString(states);
break;
case "json":
JsonNode json = engine.executeJson(inMsg);
JsonNode json = engine.executeJsonAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS);
output = objectMapper.writeValueAsString(json);
break;
case "string":
output = engine.executeToString(inMsg);
output = engine.executeToStringAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS);
break;
default:
throw new IllegalArgumentException("Unsupported script type: " + scriptType);

View File

@ -106,96 +106,77 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
}
}
@Override
public List<TbMsg> executeUpdate(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (result.isObject()) {
return Collections.singletonList(unbindMsg(result, msg));
} else if (result.isArray()){
List<TbMsg> res = new ArrayList<>(result.size());
result.forEach(jsonObject -> res.add(unbindMsg(jsonObject, msg)));
return res;
} else {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
}
@Override
public ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg) {
ListenableFuture<JsonNode> result = executeScriptAsync(msg);
return Futures.transformAsync(result, json -> {
return Futures.transformAsync(result,
json -> executeUpdateTransform(msg, json),
MoreExecutors.directExecutor());
}
ListenableFuture<List<TbMsg>> executeUpdateTransform(TbMsg msg, JsonNode json) {
if (json.isObject()) {
return Futures.immediateFuture(Collections.singletonList(unbindMsg(json, msg)));
} else if (json.isArray()){
} else if (json.isArray()) {
List<TbMsg> res = new ArrayList<>(json.size());
json.forEach(jsonObject -> res.add(unbindMsg(jsonObject, msg)));
return Futures.immediateFuture(res);
}
else{
log.warn("Wrong result type: {}", json.getNodeType());
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
}
}, MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<TbMsg> executeGenerateAsync(TbMsg prevMsg) {
log.trace("execute generate async, prevMsg {}", prevMsg);
return Futures.transformAsync(executeScriptAsync(prevMsg), result -> {
return Futures.transformAsync(executeScriptAsync(prevMsg),
result -> executeGenerateTransform(prevMsg, result),
MoreExecutors.directExecutor());
}
ListenableFuture<TbMsg> executeGenerateTransform(TbMsg prevMsg, JsonNode result) {
if (!result.isObject()) {
log.warn("Wrong result type: {}", result.getNodeType());
Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + result.getNodeType()));
}
return Futures.immediateFuture(unbindMsg(result, prevMsg));
}, MoreExecutors.directExecutor());
}
@Override
public JsonNode executeJson(TbMsg msg) throws ScriptException {
return executeScript(msg);
}
@Override
public ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) throws ScriptException {
public ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) {
return executeScriptAsync(msg);
}
@Override
public String executeToString(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (!result.isTextual()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return result.asText();
public ListenableFuture<String> executeToStringAsync(TbMsg msg) {
return Futures.transformAsync(executeScriptAsync(msg),
this::executeToStringTransform,
MoreExecutors.directExecutor());
}
@Override
public boolean executeFilter(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (!result.isBoolean()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
ListenableFuture<String> executeToStringTransform(JsonNode result) {
if (result.isTextual()) {
return Futures.immediateFuture(result.asText());
}
return result.asBoolean();
log.warn("Wrong result type: {}", result.getNodeType());
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + result.getNodeType()));
}
@Override
public ListenableFuture<Boolean> executeFilterAsync(TbMsg msg) {
ListenableFuture<JsonNode> result = executeScriptAsync(msg);
return Futures.transformAsync(result, json -> {
if (!json.isBoolean()) {
log.warn("Wrong result type: {}", json.getNodeType());
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
} else {
return Futures.immediateFuture(json.asBoolean());
}
}, MoreExecutors.directExecutor());
return Futures.transformAsync(executeScriptAsync(msg),
this::executeFilterTransform,
MoreExecutors.directExecutor());
}
ListenableFuture<Set<String>> executeSwitchPostProcessAsyncFunction(JsonNode result) {
ListenableFuture<Boolean> executeFilterTransform(JsonNode json) {
if (json.isBoolean()) {
return Futures.immediateFuture(json.asBoolean());
}
log.warn("Wrong result type: {}", json.getNodeType());
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
}
ListenableFuture<Set<String>> executeSwitchTransform(JsonNode result) {
if (result.isTextual()) {
return Futures.immediateFuture(Collections.singleton(result.asText()));
}
@ -217,34 +198,19 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
@Override
public ListenableFuture<Set<String>> executeSwitchAsync(TbMsg msg) {
log.trace("execute switch async, msg {}", msg);
return Futures.transformAsync(executeScriptAsync(msg),
this::executeSwitchPostProcessAsyncFunction,
this::executeSwitchTransform,
MoreExecutors.directExecutor()); //usually runs in a callbackExecutor
}
private JsonNode executeScript(TbMsg msg) throws ScriptException {
try {
String[] inArgs = prepareArgs(msg);
String eval = sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
return mapper.readTree(eval);
} catch (ExecutionException e) {
if (e.getCause() instanceof ScriptException) {
throw (ScriptException) e.getCause();
} else if (e.getCause() instanceof RuntimeException) {
throw new ScriptException(e.getCause().getMessage());
} else {
throw new ScriptException(e);
}
} catch (Exception e) {
throw new ScriptException(e);
}
}
private ListenableFuture<JsonNode> executeScriptAsync(TbMsg msg) {
ListenableFuture<JsonNode> executeScriptAsync(TbMsg msg) {
log.trace("execute script async, msg {}", msg);
String[] inArgs = prepareArgs(msg);
return Futures.transformAsync(sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]),
return executeScriptAsync(msg.getCustomerId(), inArgs[0], inArgs[1], inArgs[2]);
}
ListenableFuture<JsonNode> executeScriptAsync(CustomerId customerId, Object... args) {
return Futures.transformAsync(sandboxService.invokeFunction(tenantId, customerId, this.scriptId, args),
o -> {
try {
return Futures.immediateFuture(mapper.readTree(o.toString()));

View File

@ -19,14 +19,11 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.msg.TbMsg;
import javax.script.ScriptException;
import java.util.List;
import java.util.Set;
public interface ScriptEngine {
List<TbMsg> executeUpdate(TbMsg msg) throws ScriptException;
ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg);
ListenableFuture<TbMsg> executeGenerateAsync(TbMsg prevMsg);
@ -37,11 +34,9 @@ public interface ScriptEngine {
ListenableFuture<Set<String>> executeSwitchAsync(TbMsg msg);
JsonNode executeJson(TbMsg msg) throws ScriptException;
ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg);
ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) throws ScriptException;
String executeToString(TbMsg msg) throws ScriptException;
ListenableFuture<String> executeToStringAsync(TbMsg msg);
void destroy();

View File

@ -214,6 +214,10 @@ public interface TbContext {
EdgeEventService getEdgeEventService();
/**
* Js script executors call are completely asynchronous
* */
@Deprecated
ListeningExecutor getJsExecutor();
ListeningExecutor getMailExecutor();

View File

@ -15,8 +15,11 @@
*/
package org.thingsboard.rule.engine.action;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ListeningExecutor;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
@ -55,18 +58,21 @@ public class TbLogNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
ListeningExecutor jsExecutor = ctx.getJsExecutor();
ctx.logJsEvalRequest();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeToString(msg)),
toString -> {
Futures.addCallback(jsEngine.executeToStringAsync(msg), new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
ctx.logJsEvalResponse();
log.info(toString);
log.info(result);
ctx.tellSuccess(msg);
},
t -> {
}
@Override
public void onFailure(Throwable t) {
ctx.logJsEvalResponse();
ctx.tellFailure(msg, t);
});
}
}, MoreExecutors.directExecutor()); //usually js responses runs on js callback executor
}
@Override

View File

@ -33,8 +33,6 @@ import org.thingsboard.server.common.msg.TbMsg;
import java.util.Set;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
@Slf4j
@RuleNode(
type = ComponentType.FILTER,

View File

@ -64,7 +64,7 @@ public class TbJsSwitchNodeTest {
private RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
@Test
public void multipleRoutesAreAllowed() throws TbNodeException, ScriptException {
public void multipleRoutesAreAllowed() throws TbNodeException {
initWithScript();
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
@ -72,11 +72,9 @@ public class TbJsSwitchNodeTest {
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson, ruleChainId, ruleNodeId);
mockJsExecutor();
when(scriptEngine.executeSwitchAsync(msg)).thenReturn(Futures.immediateFuture(Sets.newHashSet("one", "three")));
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
verify(ctx).tellNext(msg, Sets.newHashSet("one", "three"));
}
@ -92,19 +90,6 @@ public class TbJsSwitchNodeTest {
node.init(ctx, nodeConfiguration);
}
@SuppressWarnings("unchecked")
private void mockJsExecutor() {
when(ctx.getJsExecutor()).thenReturn(executor);
doAnswer((Answer<ListenableFuture<Set<String>>>) invocationOnMock -> {
try {
Callable task = (Callable) (invocationOnMock.getArguments())[0];
return Futures.immediateFuture((Set<String>) task.call());
} catch (Throwable th) {
return Futures.immediateFailedFuture(th);
}
}).when(executor).executeAsync(ArgumentMatchers.any(Callable.class));
}
private void verifyError(TbMsg msg, String message, Class expectedClass) {
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(same(msg), captor.capture());