Cleanup script engine classes
This commit is contained in:
parent
69964a2413
commit
2055dc83be
@ -17,18 +17,16 @@ package org.thingsboard.server.service.script;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.script.api.RuleNodeScriptFactory;
|
import org.thingsboard.script.api.RuleNodeScriptFactory;
|
||||||
|
import org.thingsboard.script.api.TbScriptException;
|
||||||
import org.thingsboard.script.api.js.JsInvokeService;
|
import org.thingsboard.script.api.js.JsInvokeService;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
import javax.script.ScriptException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@ -36,85 +34,12 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
public class RuleNodeJsScriptEngine extends RuleNodeScriptEngine<JsInvokeService, JsonNode> {
|
public class RuleNodeJsScriptEngine extends RuleNodeScriptEngine<JsInvokeService, JsonNode> {
|
||||||
|
|
||||||
public RuleNodeJsScriptEngine(TenantId tenantId, JsInvokeService scriptInvokeService, String script, String... argNames) {
|
public RuleNodeJsScriptEngine(TenantId tenantId, JsInvokeService scriptInvokeService, String script, String... argNames) {
|
||||||
super(tenantId, scriptInvokeService, script, argNames);
|
super(tenantId, scriptInvokeService, script, argNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) {
|
|
||||||
return executeScriptAsync(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<List<TbMsg>> executeUpdateTransform(TbMsg msg, JsonNode json) {
|
|
||||||
if (json.isObject()) {
|
|
||||||
return Futures.immediateFuture(Collections.singletonList(unbindMsg(json, msg)));
|
|
||||||
} else if (json.isArray()) {
|
|
||||||
List<TbMsg> res = new ArrayList<>(json.size());
|
|
||||||
json.forEach(jsonObject -> res.add(unbindMsg(jsonObject, msg)));
|
|
||||||
return Futures.immediateFuture(res);
|
|
||||||
}
|
|
||||||
log.warn("Wrong result type: {}", json.getNodeType());
|
|
||||||
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<TbMsg> executeGenerateTransform(TbMsg prevMsg, JsonNode result) {
|
|
||||||
if (!result.isObject()) {
|
|
||||||
log.warn("Wrong result type: {}", result.getNodeType());
|
|
||||||
Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + result.getNodeType()));
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(unbindMsg(result, prevMsg));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected JsonNode convertResult(Object result) {
|
|
||||||
return JacksonUtil.toJsonNode(result != null ? result.toString() : null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<String> executeToStringTransform(JsonNode result) {
|
|
||||||
if (result.isTextual()) {
|
|
||||||
return Futures.immediateFuture(result.asText());
|
|
||||||
}
|
|
||||||
log.warn("Wrong result type: {}", result.getNodeType());
|
|
||||||
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + result.getNodeType()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<Boolean> executeFilterTransform(JsonNode json) {
|
|
||||||
if (json.isBoolean()) {
|
|
||||||
return Futures.immediateFuture(json.asBoolean());
|
|
||||||
}
|
|
||||||
log.warn("Wrong result type: {}", json.getNodeType());
|
|
||||||
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<Set<String>> executeSwitchTransform(JsonNode result) {
|
|
||||||
if (result.isTextual()) {
|
|
||||||
return Futures.immediateFuture(Collections.singleton(result.asText()));
|
|
||||||
}
|
|
||||||
if (result.isArray()) {
|
|
||||||
Set<String> nextStates = new HashSet<>();
|
|
||||||
for (JsonNode val : result) {
|
|
||||||
if (!val.isTextual()) {
|
|
||||||
log.warn("Wrong result type: {}", val.getNodeType());
|
|
||||||
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + val.getNodeType()));
|
|
||||||
} else {
|
|
||||||
nextStates.add(val.asText());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(nextStates);
|
|
||||||
}
|
|
||||||
log.warn("Wrong result type: {}", result.getNodeType());
|
|
||||||
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + result.getNodeType()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object[] prepareArgs(TbMsg msg) {
|
protected Object[] prepareArgs(TbMsg msg) {
|
||||||
String[] args = new String[3];
|
String[] args = new String[3];
|
||||||
@ -128,6 +53,71 @@ public class RuleNodeJsScriptEngine extends RuleNodeScriptEngine<JsInvokeService
|
|||||||
return args;
|
return args;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<TbMsg> executeUpdateTransform(TbMsg msg, JsonNode json) {
|
||||||
|
if (json.isObject()) {
|
||||||
|
return Collections.singletonList(unbindMsg(json, msg));
|
||||||
|
} else if (json.isArray()) {
|
||||||
|
List<TbMsg> res = new ArrayList<>(json.size());
|
||||||
|
json.forEach(jsonObject -> res.add(unbindMsg(jsonObject, msg)));
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
throw wrongResultType(json);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbMsg executeGenerateTransform(TbMsg prevMsg, JsonNode result) {
|
||||||
|
if (!result.isObject()) {
|
||||||
|
throw wrongResultType(result);
|
||||||
|
}
|
||||||
|
return unbindMsg(result, prevMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean executeFilterTransform(JsonNode json) {
|
||||||
|
if (json.isBoolean()) {
|
||||||
|
return json.asBoolean();
|
||||||
|
}
|
||||||
|
throw wrongResultType(json);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Set<String> executeSwitchTransform(JsonNode result) {
|
||||||
|
if (result.isTextual()) {
|
||||||
|
return Collections.singleton(result.asText());
|
||||||
|
}
|
||||||
|
if (result.isArray()) {
|
||||||
|
Set<String> nextStates = new HashSet<>();
|
||||||
|
for (JsonNode val : result) {
|
||||||
|
if (!val.isTextual()) {
|
||||||
|
throw wrongResultType(val);
|
||||||
|
} else {
|
||||||
|
nextStates.add(val.asText());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nextStates;
|
||||||
|
}
|
||||||
|
throw wrongResultType(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) {
|
||||||
|
return executeScriptAsync(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String executeToStringTransform(JsonNode result) {
|
||||||
|
if (result.isTextual()) {
|
||||||
|
return result.asText();
|
||||||
|
}
|
||||||
|
throw wrongResultType(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected JsonNode convertResult(Object result) {
|
||||||
|
return JacksonUtil.toJsonNode(result != null ? result.toString() : null);
|
||||||
|
}
|
||||||
|
|
||||||
private static TbMsg unbindMsg(JsonNode msgData, TbMsg msg) {
|
private static TbMsg unbindMsg(JsonNode msgData, TbMsg msg) {
|
||||||
String data = null;
|
String data = null;
|
||||||
Map<String, String> metadata = null;
|
Map<String, String> metadata = null;
|
||||||
@ -138,19 +128,23 @@ public class RuleNodeJsScriptEngine extends RuleNodeScriptEngine<JsInvokeService
|
|||||||
}
|
}
|
||||||
if (msgData.has(RuleNodeScriptFactory.METADATA)) {
|
if (msgData.has(RuleNodeScriptFactory.METADATA)) {
|
||||||
JsonNode msgMetadata = msgData.get(RuleNodeScriptFactory.METADATA);
|
JsonNode msgMetadata = msgData.get(RuleNodeScriptFactory.METADATA);
|
||||||
metadata = JacksonUtil.convertValue(msgMetadata, new TypeReference<>() {
|
metadata = JacksonUtil.convertValue(msgMetadata, new TypeReference<>() {});
|
||||||
});
|
|
||||||
}
|
}
|
||||||
if (msgData.has(RuleNodeScriptFactory.MSG_TYPE)) {
|
if (msgData.has(RuleNodeScriptFactory.MSG_TYPE)) {
|
||||||
messageType = msgData.get(RuleNodeScriptFactory.MSG_TYPE).asText();
|
messageType = msgData.get(RuleNodeScriptFactory.MSG_TYPE).asText();
|
||||||
}
|
}
|
||||||
String newData = data != null ? data : msg.getData();
|
String newData = data != null ? data : msg.getData();
|
||||||
TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData().copy();
|
TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData().copy();
|
||||||
String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
|
String newMessageType = StringUtils.isNotEmpty(messageType) ? messageType : msg.getType();
|
||||||
return msg.transform()
|
return msg.transform()
|
||||||
.type(newMessageType)
|
.type(newMessageType)
|
||||||
.metaData(newMetadata)
|
.metaData(newMetadata)
|
||||||
.data(newData)
|
.data(newData)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TbScriptException wrongResultType(JsonNode result) {
|
||||||
|
return new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, null, new ClassCastException("Wrong result type: " + result.getNodeType()));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,7 +17,6 @@ package org.thingsboard.server.service.script;
|
|||||||
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.rule.engine.api.ScriptEngine;
|
import org.thingsboard.rule.engine.api.ScriptEngine;
|
||||||
import org.thingsboard.script.api.ScriptInvokeService;
|
import org.thingsboard.script.api.ScriptInvokeService;
|
||||||
@ -27,25 +26,26 @@ import org.thingsboard.server.common.data.id.CustomerId;
|
|||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
|
|
||||||
import javax.script.ScriptException;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class RuleNodeScriptEngine<T extends ScriptInvokeService, R> implements ScriptEngine {
|
public abstract class RuleNodeScriptEngine<T extends ScriptInvokeService, R> implements ScriptEngine {
|
||||||
|
|
||||||
private final T scriptInvokeService;
|
private final T scriptInvokeService;
|
||||||
|
|
||||||
private final UUID scriptId;
|
protected final UUID scriptId;
|
||||||
private final TenantId tenantId;
|
private final TenantId tenantId;
|
||||||
|
|
||||||
public RuleNodeScriptEngine(TenantId tenantId, T scriptInvokeService, String script, String... argNames) {
|
public RuleNodeScriptEngine(TenantId tenantId, T scriptInvokeService, String script, String... argNames) {
|
||||||
this.tenantId = tenantId;
|
this.tenantId = tenantId;
|
||||||
this.scriptInvokeService = scriptInvokeService;
|
this.scriptInvokeService = scriptInvokeService;
|
||||||
try {
|
try {
|
||||||
this.scriptId = this.scriptInvokeService.eval(tenantId, ScriptType.RULE_NODE_SCRIPT, script, argNames).get();
|
scriptId = this.scriptInvokeService.eval(tenantId, ScriptType.RULE_NODE_SCRIPT, script, argNames).get();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Throwable t = e;
|
Throwable t = e;
|
||||||
if (e instanceof ExecutionException) {
|
if (e instanceof ExecutionException) {
|
||||||
@ -63,73 +63,53 @@ public abstract class RuleNodeScriptEngine<T extends ScriptInvokeService, R> imp
|
|||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg) {
|
public ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg) {
|
||||||
ListenableFuture<R> result = executeScriptAsync(msg);
|
ListenableFuture<R> result = executeScriptAsync(msg);
|
||||||
return Futures.transformAsync(result,
|
return Futures.transform(result, json -> executeUpdateTransform(msg, json), directExecutor());
|
||||||
json -> executeUpdateTransform(msg, json),
|
|
||||||
MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract ListenableFuture<List<TbMsg>> executeUpdateTransform(TbMsg msg, R result);
|
protected abstract List<TbMsg> executeUpdateTransform(TbMsg msg, R result);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<TbMsg> executeGenerateAsync(TbMsg prevMsg) {
|
public ListenableFuture<TbMsg> executeGenerateAsync(TbMsg prevMsg) {
|
||||||
return Futures.transformAsync(executeScriptAsync(prevMsg),
|
return Futures.transform(executeScriptAsync(prevMsg), result -> executeGenerateTransform(prevMsg, result), directExecutor());
|
||||||
result -> executeGenerateTransform(prevMsg, result),
|
|
||||||
MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract ListenableFuture<TbMsg> executeGenerateTransform(TbMsg prevMsg, R result);
|
protected abstract TbMsg executeGenerateTransform(TbMsg prevMsg, R result);
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<String> executeToStringAsync(TbMsg msg) {
|
|
||||||
return Futures.transformAsync(executeScriptAsync(msg), this::executeToStringTransform, MoreExecutors.directExecutor());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Boolean> executeFilterAsync(TbMsg msg) {
|
public ListenableFuture<Boolean> executeFilterAsync(TbMsg msg) {
|
||||||
return Futures.transformAsync(executeScriptAsync(msg),
|
return Futures.transform(executeScriptAsync(msg), this::executeFilterTransform, directExecutor());
|
||||||
this::executeFilterTransform,
|
|
||||||
MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract ListenableFuture<String> executeToStringTransform(R result);
|
protected abstract boolean executeFilterTransform(R result);
|
||||||
|
|
||||||
protected abstract ListenableFuture<Boolean> executeFilterTransform(R result);
|
|
||||||
|
|
||||||
protected abstract ListenableFuture<Set<String>> executeSwitchTransform(R result);
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Set<String>> executeSwitchAsync(TbMsg msg) {
|
public ListenableFuture<Set<String>> executeSwitchAsync(TbMsg msg) {
|
||||||
return Futures.transformAsync(executeScriptAsync(msg),
|
return Futures.transform(executeScriptAsync(msg), this::executeSwitchTransform, directExecutor()); // usually runs on a callbackExecutor
|
||||||
this::executeSwitchTransform,
|
|
||||||
MoreExecutors.directExecutor()); //usually runs in a callbackExecutor
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract Set<String> executeSwitchTransform(R result);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListenableFuture<String> executeToStringAsync(TbMsg msg) {
|
||||||
|
return Futures.transform(executeScriptAsync(msg), this::executeToStringTransform, directExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract String executeToStringTransform(R result);
|
||||||
|
|
||||||
ListenableFuture<R> executeScriptAsync(TbMsg msg) {
|
ListenableFuture<R> executeScriptAsync(TbMsg msg) {
|
||||||
log.trace("execute script async, msg {}", msg);
|
log.trace("execute script async, msg {}", msg);
|
||||||
Object[] inArgs = prepareArgs(msg);
|
Object[] inArgs = prepareArgs(msg);
|
||||||
return executeScriptAsync(msg.getCustomerId(), inArgs[0], inArgs[1], inArgs[2]);
|
return executeScriptAsync(msg.getCustomerId(), inArgs[0], inArgs[1], inArgs[2]);
|
||||||
}
|
}
|
||||||
|
|
||||||
ListenableFuture<R> executeScriptAsync(CustomerId customerId, Object... args) {
|
private ListenableFuture<R> executeScriptAsync(CustomerId customerId, Object... args) {
|
||||||
return Futures.transformAsync(scriptInvokeService.invokeScript(tenantId, customerId, this.scriptId, args),
|
return Futures.transform(scriptInvokeService.invokeScript(tenantId, customerId, scriptId, args), this::convertResult, directExecutor());
|
||||||
o -> {
|
|
||||||
try {
|
|
||||||
return Futures.immediateFuture(convertResult(o));
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (e.getCause() instanceof ScriptException) {
|
|
||||||
return Futures.immediateFailedFuture(e.getCause());
|
|
||||||
} else if (e.getCause() instanceof RuntimeException) {
|
|
||||||
return Futures.immediateFailedFuture(new ScriptException(e.getCause().getMessage()));
|
|
||||||
} else {
|
|
||||||
return Futures.immediateFailedFuture(new ScriptException(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
scriptInvokeService.release(this.scriptId);
|
scriptInvokeService.release(scriptId);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract R convertResult(Object result);
|
protected abstract R convertResult(Object result);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,17 +19,15 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.script.api.RuleNodeScriptFactory;
|
import org.thingsboard.script.api.RuleNodeScriptFactory;
|
||||||
|
import org.thingsboard.script.api.TbScriptException;
|
||||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
import javax.script.ScriptException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -40,86 +38,14 @@ import java.util.Map;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
public class RuleNodeTbelScriptEngine extends RuleNodeScriptEngine<TbelInvokeService, Object> {
|
public class RuleNodeTbelScriptEngine extends RuleNodeScriptEngine<TbelInvokeService, Object> {
|
||||||
|
|
||||||
public RuleNodeTbelScriptEngine(TenantId tenantId, TbelInvokeService scriptInvokeService, String script, String... argNames) {
|
public RuleNodeTbelScriptEngine(TenantId tenantId, TbelInvokeService scriptInvokeService, String script, String... argNames) {
|
||||||
super(tenantId, scriptInvokeService, script, argNames);
|
super(tenantId, scriptInvokeService, script, argNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<Boolean> executeFilterTransform(Object result) {
|
|
||||||
if (result instanceof Boolean) {
|
|
||||||
return Futures.immediateFuture((Boolean) result);
|
|
||||||
}
|
|
||||||
return wrongResultType(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<List<TbMsg>> executeUpdateTransform(TbMsg msg, Object result) {
|
|
||||||
if (result instanceof Map) {
|
|
||||||
return Futures.immediateFuture(Collections.singletonList(unbindMsg((Map) result, msg)));
|
|
||||||
} else if (result instanceof Collection) {
|
|
||||||
List<TbMsg> res = new ArrayList<>();
|
|
||||||
for (Object resObject : (Collection) result) {
|
|
||||||
if (resObject instanceof Map) {
|
|
||||||
res.add(unbindMsg((Map) resObject, msg));
|
|
||||||
} else {
|
|
||||||
return wrongResultType(resObject);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(res);
|
|
||||||
}
|
|
||||||
return wrongResultType(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<TbMsg> executeGenerateTransform(TbMsg prevMsg, Object result) {
|
|
||||||
if (result instanceof Map) {
|
|
||||||
return Futures.immediateFuture(unbindMsg((Map) result, prevMsg));
|
|
||||||
}
|
|
||||||
return wrongResultType(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<String> executeToStringTransform(Object result) {
|
|
||||||
if (result instanceof String) {
|
|
||||||
return Futures.immediateFuture((String) result);
|
|
||||||
} else {
|
|
||||||
return Futures.immediateFuture(JacksonUtil.toString(result));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ListenableFuture<Set<String>> executeSwitchTransform(Object result) {
|
|
||||||
if (result instanceof String) {
|
|
||||||
return Futures.immediateFuture(Collections.singleton((String) result));
|
|
||||||
} else if (result instanceof Collection) {
|
|
||||||
Set<String> res = new HashSet<>();
|
|
||||||
for (Object resObject : (Collection) result) {
|
|
||||||
if (resObject instanceof String) {
|
|
||||||
res.add((String) resObject);
|
|
||||||
} else {
|
|
||||||
return wrongResultType(resObject);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(res);
|
|
||||||
}
|
|
||||||
return wrongResultType(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) {
|
|
||||||
return Futures.transform(executeScriptAsync(msg), JacksonUtil::valueToTree, MoreExecutors.directExecutor());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Object convertResult(Object result) {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object[] prepareArgs(TbMsg msg) {
|
protected Object[] prepareArgs(TbMsg msg) {
|
||||||
Object[] args = new Object[3];
|
Object[] args = new Object[3];
|
||||||
@ -133,6 +59,74 @@ public class RuleNodeTbelScriptEngine extends RuleNodeScriptEngine<TbelInvokeSer
|
|||||||
return args;
|
return args;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<TbMsg> executeUpdateTransform(TbMsg msg, Object result) {
|
||||||
|
if (result instanceof Map msgData) {
|
||||||
|
return Collections.singletonList(unbindMsg(msgData, msg));
|
||||||
|
} else if (result instanceof Collection resultCollection) {
|
||||||
|
List<TbMsg> res = new ArrayList<>(resultCollection.size());
|
||||||
|
for (Object resObject : resultCollection) {
|
||||||
|
if (resObject instanceof Map msgData) {
|
||||||
|
res.add(unbindMsg(msgData, msg));
|
||||||
|
} else {
|
||||||
|
throw wrongResultType(resObject);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
throw wrongResultType(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbMsg executeGenerateTransform(TbMsg prevMsg, Object result) {
|
||||||
|
if (result instanceof Map msgData) {
|
||||||
|
return unbindMsg(msgData, prevMsg);
|
||||||
|
}
|
||||||
|
throw wrongResultType(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean executeFilterTransform(Object result) {
|
||||||
|
if (result instanceof Boolean b) {
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
throw wrongResultType(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Set<String> executeSwitchTransform(Object result) {
|
||||||
|
if (result instanceof String str) {
|
||||||
|
return Collections.singleton(str);
|
||||||
|
}
|
||||||
|
if (result instanceof Collection<?> resultCollection) {
|
||||||
|
Set<String> res = new HashSet<>(resultCollection.size());
|
||||||
|
for (Object resObject : resultCollection) {
|
||||||
|
if (resObject instanceof String str) {
|
||||||
|
res.add(str);
|
||||||
|
} else {
|
||||||
|
throw wrongResultType(resObject);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
throw wrongResultType(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) {
|
||||||
|
return Futures.transform(executeScriptAsync(msg), JacksonUtil::valueToTree, directExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Object convertResult(Object result) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String executeToStringTransform(Object result) {
|
||||||
|
return result instanceof String str ? str : JacksonUtil.toString(result);
|
||||||
|
}
|
||||||
|
|
||||||
private static TbMsg unbindMsg(Map msgData, TbMsg msg) {
|
private static TbMsg unbindMsg(Map msgData, TbMsg msg) {
|
||||||
String data = null;
|
String data = null;
|
||||||
Map<String, String> metadata = null;
|
Map<String, String> metadata = null;
|
||||||
@ -142,12 +136,12 @@ public class RuleNodeTbelScriptEngine extends RuleNodeScriptEngine<TbelInvokeSer
|
|||||||
}
|
}
|
||||||
if (msgData.containsKey(RuleNodeScriptFactory.METADATA)) {
|
if (msgData.containsKey(RuleNodeScriptFactory.METADATA)) {
|
||||||
Object msgMetadataObj = msgData.get(RuleNodeScriptFactory.METADATA);
|
Object msgMetadataObj = msgData.get(RuleNodeScriptFactory.METADATA);
|
||||||
if (msgMetadataObj instanceof Map) {
|
if (msgMetadataObj instanceof Map<?, ?> msgMetadataObjAsMap) {
|
||||||
metadata = ((Map<?, ?>) msgMetadataObj).entrySet().stream().filter(e -> e.getValue() != null)
|
metadata = msgMetadataObjAsMap.entrySet().stream()
|
||||||
|
.filter(e -> e.getValue() != null)
|
||||||
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));
|
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));
|
||||||
} else {
|
} else {
|
||||||
metadata = JacksonUtil.convertValue(msgMetadataObj, new TypeReference<>() {
|
metadata = JacksonUtil.convertValue(msgMetadataObj, new TypeReference<>() {});
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (msgData.containsKey(RuleNodeScriptFactory.MSG_TYPE)) {
|
if (msgData.containsKey(RuleNodeScriptFactory.MSG_TYPE)) {
|
||||||
@ -155,7 +149,7 @@ public class RuleNodeTbelScriptEngine extends RuleNodeScriptEngine<TbelInvokeSer
|
|||||||
}
|
}
|
||||||
String newData = data != null ? data : msg.getData();
|
String newData = data != null ? data : msg.getData();
|
||||||
TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData().copy();
|
TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData().copy();
|
||||||
String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
|
String newMessageType = StringUtils.isNotEmpty(messageType) ? messageType : msg.getType();
|
||||||
return msg.transform()
|
return msg.transform()
|
||||||
.type(newMessageType)
|
.type(newMessageType)
|
||||||
.metaData(newMetadata)
|
.metaData(newMetadata)
|
||||||
@ -163,13 +157,13 @@ public class RuleNodeTbelScriptEngine extends RuleNodeScriptEngine<TbelInvokeSer
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> ListenableFuture<T> wrongResultType(Object result) {
|
private TbScriptException wrongResultType(Object result) {
|
||||||
String className = toClassName(result);
|
String className = toClassName(result);
|
||||||
log.warn("Wrong result type: {}", className);
|
return new TbScriptException(scriptId, TbScriptException.ErrorCode.RUNTIME, null, new ClassCastException("Wrong result type: " + className));
|
||||||
return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + className));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String toClassName(Object result) {
|
private static String toClassName(Object result) {
|
||||||
return result != null ? result.getClass().getSimpleName() : "null";
|
return result != null ? result.getClass().getSimpleName() : "null";
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user