JS Invoke become async

This commit is contained in:
Andrew Shvayka 2019-11-29 18:31:24 +02:00
parent da9b2b4960
commit 28e2c74ce3
4 changed files with 54 additions and 4 deletions

View File

@ -19,6 +19,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
@ -109,6 +111,19 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
return unbindMsg(result, msg); return unbindMsg(result, msg);
} }
@Override
public ListenableFuture<TbMsg> executeUpdateAsync(TbMsg msg) {
ListenableFuture<JsonNode> result = executeScriptAsync(msg);
return Futures.transformAsync(result, json -> {
if (!json.isObject()) {
log.warn("Wrong result type: {}", json.getNodeType());
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
} else {
return Futures.immediateFuture(unbindMsg(json, msg));
}
});
}
@Override @Override
public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException { public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException {
JsonNode result = executeScript(prevMsg); JsonNode result = executeScript(prevMsg);
@ -144,6 +159,19 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
return result.asBoolean(); return result.asBoolean();
} }
@Override
public ListenableFuture<Boolean> executeFilterAsync(TbMsg msg) {
ListenableFuture<JsonNode> result = executeScriptAsync(msg);
return Futures.transformAsync(result, json -> {
if (!json.isBoolean()) {
log.warn("Wrong result type: {}", json.getNodeType());
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
} else {
return Futures.immediateFuture(json.asBoolean());
}
});
}
@Override @Override
public Set<String> executeSwitch(TbMsg msg) throws ScriptException { public Set<String> executeSwitch(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg); JsonNode result = executeScript(msg);
@ -184,6 +212,24 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
} }
} }
private ListenableFuture<JsonNode> executeScriptAsync(TbMsg msg) {
String[] inArgs = prepareArgs(msg);
return Futures.transformAsync(sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]),
o -> {
try {
return Futures.immediateFuture(mapper.readTree(o.toString()));
} catch (Exception e) {
if (e.getCause() instanceof ScriptException) {
return Futures.immediateFailedFuture(e.getCause());
} else if (e.getCause() instanceof RuntimeException) {
return Futures.immediateFailedFuture(new ScriptException(e.getCause().getMessage()));
} else {
return Futures.immediateFailedFuture(new ScriptException(e));
}
}
});
}
public void destroy() { public void destroy() {
sandboxService.release(this.scriptId); sandboxService.release(this.scriptId);
} }

View File

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.api; package org.thingsboard.rule.engine.api;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import javax.script.ScriptException; import javax.script.ScriptException;
@ -25,10 +26,14 @@ public interface ScriptEngine {
TbMsg executeUpdate(TbMsg msg) throws ScriptException; TbMsg executeUpdate(TbMsg msg) throws ScriptException;
ListenableFuture<TbMsg> executeUpdateAsync(TbMsg msg);
TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException; TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException;
boolean executeFilter(TbMsg msg) throws ScriptException; boolean executeFilter(TbMsg msg) throws ScriptException;
ListenableFuture<Boolean> executeFilterAsync(TbMsg msg);
Set<String> executeSwitch(TbMsg msg) throws ScriptException; Set<String> executeSwitch(TbMsg msg) throws ScriptException;
JsonNode executeJson(TbMsg msg) throws ScriptException; JsonNode executeJson(TbMsg msg) throws ScriptException;

View File

@ -51,9 +51,8 @@ public class TbJsFilterNode implements TbNode {
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
ListeningExecutor jsExecutor = ctx.getJsExecutor();
ctx.logJsEvalRequest(); ctx.logJsEvalRequest();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)), withCallback(jsEngine.executeFilterAsync(msg),
filterResult -> { filterResult -> {
ctx.logJsEvalResponse(); ctx.logJsEvalResponse();
ctx.tellNext(msg, filterResult ? "True" : "False"); ctx.tellNext(msg, filterResult ? "True" : "False");

View File

@ -53,7 +53,7 @@ public class TbTransformMsgNode extends TbAbstractTransformNode {
@Override @Override
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) { protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
ctx.logJsEvalRequest(); ctx.logJsEvalRequest();
return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(msg)); return jsEngine.executeUpdateAsync(msg);
} }
@Override @Override