diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 8b996c6257..12150b806a 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -19,6 +19,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.thingsboard.server.common.data.id.EntityId; @@ -109,6 +111,19 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S return unbindMsg(result, msg); } + @Override + public ListenableFuture executeUpdateAsync(TbMsg msg) { + ListenableFuture result = executeScriptAsync(msg); + return Futures.transformAsync(result, json -> { + if (!json.isObject()) { + log.warn("Wrong result type: {}", json.getNodeType()); + return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType())); + } else { + return Futures.immediateFuture(unbindMsg(json, msg)); + } + }); + } + @Override public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException { JsonNode result = executeScript(prevMsg); @@ -144,6 +159,19 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S return result.asBoolean(); } + @Override + public ListenableFuture executeFilterAsync(TbMsg msg) { + ListenableFuture result = executeScriptAsync(msg); + return Futures.transformAsync(result, json -> { + if (!json.isBoolean()) { + log.warn("Wrong result type: {}", json.getNodeType()); + return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType())); + } else { + return Futures.immediateFuture(json.asBoolean()); + } + }); + } + @Override public Set executeSwitch(TbMsg msg) throws ScriptException { JsonNode result = executeScript(msg); @@ -173,7 +201,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S return mapper.readTree(eval); } catch (ExecutionException e) { if (e.getCause() instanceof ScriptException) { - throw (ScriptException)e.getCause(); + throw (ScriptException) e.getCause(); } else if (e.getCause() instanceof RuntimeException) { throw new ScriptException(e.getCause().getMessage()); } else { @@ -184,6 +212,24 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } } + private ListenableFuture executeScriptAsync(TbMsg msg) { + String[] inArgs = prepareArgs(msg); + return Futures.transformAsync(sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]), + o -> { + try { + return Futures.immediateFuture(mapper.readTree(o.toString())); + } catch (Exception e) { + if (e.getCause() instanceof ScriptException) { + return Futures.immediateFailedFuture(e.getCause()); + } else if (e.getCause() instanceof RuntimeException) { + return Futures.immediateFailedFuture(new ScriptException(e.getCause().getMessage())); + } else { + return Futures.immediateFailedFuture(new ScriptException(e)); + } + } + }); + } + public void destroy() { sandboxService.release(this.scriptId); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java index 4d9a4e612b..f72adf04ab 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.api; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.msg.TbMsg; import javax.script.ScriptException; @@ -25,10 +26,14 @@ public interface ScriptEngine { TbMsg executeUpdate(TbMsg msg) throws ScriptException; + ListenableFuture executeUpdateAsync(TbMsg msg); + TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException; boolean executeFilter(TbMsg msg) throws ScriptException; + ListenableFuture executeFilterAsync(TbMsg msg); + Set executeSwitch(TbMsg msg) throws ScriptException; JsonNode executeJson(TbMsg msg) throws ScriptException; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java index a9b287fa4c..2effb714b9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java @@ -51,9 +51,8 @@ public class TbJsFilterNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - ListeningExecutor jsExecutor = ctx.getJsExecutor(); ctx.logJsEvalRequest(); - withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)), + withCallback(jsEngine.executeFilterAsync(msg), filterResult -> { ctx.logJsEvalResponse(); ctx.tellNext(msg, filterResult ? "True" : "False"); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java index a9239ae924..47e5b3c9c9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java @@ -53,7 +53,7 @@ public class TbTransformMsgNode extends TbAbstractTransformNode { @Override protected ListenableFuture transform(TbContext ctx, TbMsg msg) { ctx.logJsEvalRequest(); - return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(msg)); + return jsEngine.executeUpdateAsync(msg); } @Override