Refactor JS Executor.

This commit is contained in:
Igor Kulikov 2018-04-04 11:41:51 +03:00
parent dc41e63091
commit 024dd99cb7
22 changed files with 207 additions and 165 deletions

View File

@ -321,7 +321,7 @@ public class ActorSystemContext {
.put("msgId", tbMsg.getId().toString())
.put("msgType", tbMsg.getType())
.put("dataType", tbMsg.getDataType().name())
.put("data", convertToString(tbMsg.getDataType(), tbMsg.getData()))
.put("data", tbMsg.getData())
.put("metadata", metadata);
if (error != null) {
@ -335,21 +335,6 @@ public class ActorSystemContext {
}
}
private String convertToString(TbMsgDataType messageType, byte[] data) {
if (data == null) {
return null;
}
switch (messageType) {
case JSON:
case TEXT:
return new String(data, StandardCharsets.UTF_8);
case BINARY:
return Base64Utils.encodeToString(data);
default:
throw new RuntimeException("Message type: " + messageType + " is not supported!");
}
}
public static Exception toException(Throwable error) {
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
}

View File

@ -149,7 +149,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
"CUSTOM",
device.getId(),
new TbMsgMetaData(),
new byte[]{});
"{}");
actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
Thread.sleep(3000);

View File

@ -135,7 +135,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
"CUSTOM",
device.getId(),
new TbMsgMetaData(),
new byte[]{});
"{}");
actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
Thread.sleep(3000);

View File

@ -39,9 +39,9 @@ public final class TbMsg implements Serializable {
private final EntityId originator;
private final TbMsgMetaData metaData;
private final TbMsgDataType dataType;
private final byte[] data;
private final String data;
public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, byte[] data) {
public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, String data) {
this.id = id;
this.type = type;
this.originator = originator;
@ -64,7 +64,7 @@ public final class TbMsg implements Serializable {
}
builder.setDataType(msg.getDataType().ordinal());
builder.setData(ByteString.copyFrom(msg.getData()));
builder.setData(msg.getData());
byte[] bytes = builder.build().toByteArray();
return ByteBuffer.wrap(bytes);
}
@ -75,16 +75,13 @@ public final class TbMsg implements Serializable {
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData().toByteArray());
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData());
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
}
public TbMsg copy() {
int dataSize = data.length;
byte[] dataCopy = new byte[dataSize];
System.arraycopy( data, 0, dataCopy, 0, data.length );
return new TbMsg(id, type, originator, metaData.copy(), dataType, dataCopy);
return new TbMsg(id, type, originator, metaData.copy(), dataType, data);
}
}

View File

@ -33,7 +33,7 @@ public final class TbMsgMetaData implements Serializable {
private Map<String, String> data = new ConcurrentHashMap<>();
TbMsgMetaData(Map<String, String> data) {
public TbMsgMetaData(Map<String, String> data) {
this.data = data;
}

View File

@ -32,5 +32,5 @@ message TbMsgProto {
TbMsgMetaDataProto metaData = 5;
int32 dataType = 6;
bytes data = 7;
string data = 7;
}

View File

@ -126,7 +126,7 @@ public class QueueBenchmark implements CommandLineRunner {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr.getBytes());
return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr);
}
@Bean

View File

@ -45,7 +45,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, new byte[4]);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000");
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
@ -55,7 +55,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, new byte[4]);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000");
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
future.get();
@ -68,7 +68,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr.getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();

View File

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.debug;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
@ -58,9 +59,11 @@ public class TbMsgGeneratorNode implements TbNode {
public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
private TbMsgGeneratorNodeConfiguration config;
private NashornJsEngine jsEngine;
private long delay;
private EntityId originatorId;
private UUID nextTickId;
private TbMsg prevMsg;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
@ -71,29 +74,41 @@ public class TbMsgGeneratorNode implements TbNode {
} else {
originatorId = ctx.getSelfId();
}
this.jsEngine = new NashornJsEngine(config.getJsScript(), "Generate", "prevMsg", "prevMetadata", "prevMsgType");
sentTickMsg(ctx);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
TbMsgMetaData metaData = new TbMsgMetaData();
if (config.getMsgMetaData() != null) {
config.getMsgMetaData().forEach(metaData::putValue);
}
ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), originatorId, metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8)));
sentTickMsg(ctx);
withCallback(generate(ctx),
m -> {ctx.tellNext(m); sentTickMsg(ctx);},
t -> {ctx.tellError(msg, t); sentTickMsg(ctx);});
}
}
private void sentTickMsg(TbContext ctx) {
TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{});
TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
nextTickId = tickMsg.getId();
ctx.tellSelf(tickMsg, delay);
}
protected ListenableFuture<TbMsg> generate(TbContext ctx) {
return ctx.getJsExecutor().executeAsync(() -> {
if (prevMsg == null) {
prevMsg = new TbMsg(UUIDs.timeBased(), "", originatorId, new TbMsgMetaData(), "{}");
}
TbMsg generated = jsEngine.executeGenerate(prevMsg);
prevMsg = new TbMsg(UUIDs.timeBased(), generated.getType(), originatorId, generated.getMetaData(), generated.getData());
return prevMsg;
});
}
@Override
public void destroy() {
prevMsg = null;
if (jsEngine != null) {
jsEngine.destroy();
}
}
}

View File

@ -28,17 +28,17 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration<TbMsgG
private int periodInSeconds;
private String originatorId;
private EntityType originatorType;
private String msgType;
private String msgBody;
private Map<String, String> msgMetaData;
private String jsScript;
@Override
public TbMsgGeneratorNodeConfiguration defaultConfiguration() {
TbMsgGeneratorNodeConfiguration configuration = new TbMsgGeneratorNodeConfiguration();
configuration.setMsgCount(0);
configuration.setPeriodInSeconds(1);
configuration.setMsgType("DebugMsg");
configuration.setMsgBody("{}");
configuration.setJsScript("var msg = { temp: 42, humidity: 77 };\n" +
"var metadata = { data: 40 };\n" +
"var msgType = \"DebugMsg\";\n\n" +
"return { msg: msg, metadata: metadata, msgType: msgType };");
return configuration;
}
}

View File

@ -35,7 +35,8 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
nodeDetails = "Evaluate incoming Message with configured JS condition. " +
"If <b>True</b> - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used." +
"Message payload can be accessed via <code>msg</code> property. For example <code>msg.temperature < 10;</code>" +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code>",
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code>" +
"Message type can be accessed via <code>msgType</code> property.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbFilterNodeScriptConfig")
@ -53,15 +54,11 @@ public class TbJsFilterNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
ListeningExecutor jsExecutor = ctx.getJsExecutor();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))),
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)),
filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)),
t -> ctx.tellError(msg, t));
}
private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg);
}
@Override
public void destroy() {
if (jsEngine != null) {

View File

@ -36,7 +36,8 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " +
"If Array is empty - message not routed to next Node. " +
"Message payload can be accessed via <code>msg</code> property. For example <code>msg.temperature < 10;</code> " +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code>",
"Message metadata can be accessed via <code>metadata</code> property. For example <code>metadata.customerName === 'John';</code>" +
"Message type can be accessed via <code>msgType</code> property.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbFilterNodeSwitchConfig")
public class TbJsSwitchNode implements TbNode {
@ -53,7 +54,7 @@ public class TbJsSwitchNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
ListeningExecutor jsExecutor = ctx.getJsExecutor();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))),
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(msg)),
result -> processSwitch(ctx, msg, result),
t -> ctx.tellError(msg, t));
}
@ -62,10 +63,6 @@ public class TbJsSwitchNode implements TbNode {
ctx.tellNext(msg, nextRelations);
}
private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg);
}
@Override
public void destroy() {
if (jsEngine != null) {

View File

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.js;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
@ -23,9 +24,13 @@ import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.script.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@ -34,112 +39,158 @@ import java.util.Set;
@Slf4j
public class NashornJsEngine {
public static final String MSG = "msg";
public static final String METADATA = "metadata";
public static final String DATA = "msg";
public static final String MSG_TYPE = "msgType";
private static final String JS_WRAPPER_PREFIX_TEMPLATE = "function %s(msg, metadata) { ";
private static final String JS_WRAPPER_SUFFIX_TEMPLATE = "}\n %s(msg, metadata);";
private static final String JS_WRAPPER_PREFIX_TEMPLATE = "function %s(msgStr, metadataStr, msgType) { " +
" var msg = JSON.parse(msgStr); " +
" var metadata = JSON.parse(metadataStr); " +
" return JSON.stringify(%s(msg, metadata, msgType));" +
" function %s(%s, %s, %s) {";
private static final String JS_WRAPPER_SUFFIX = "}" +
"\n}";
private static final ObjectMapper mapper = new ObjectMapper();
private static NashornScriptEngineFactory factory = new NashornScriptEngineFactory();
private static ScriptEngine engine = factory.getScriptEngine(new String[]{"--no-java"});
private CompiledScript engine;
private final String invokeFunctionName;
public NashornJsEngine(String script, String functionName) {
String jsWrapperPrefix = String.format(JS_WRAPPER_PREFIX_TEMPLATE, functionName);
String jsWrapperSuffix = String.format(JS_WRAPPER_SUFFIX_TEMPLATE, functionName);
engine = compileScript(jsWrapperPrefix + script + jsWrapperSuffix);
public NashornJsEngine(String script, String functionName, String... argNames) {
this.invokeFunctionName = "invokeInternal" + this.hashCode();
String msgArg;
String metadataArg;
String msgTypeArg;
if (argNames != null && argNames.length == 3) {
msgArg = argNames[0];
metadataArg = argNames[1];
msgTypeArg = argNames[2];
} else {
msgArg = MSG;
metadataArg = METADATA;
msgTypeArg = MSG_TYPE;
}
String jsWrapperPrefix = String.format(JS_WRAPPER_PREFIX_TEMPLATE, this.invokeFunctionName,
functionName, functionName, msgArg, metadataArg, msgTypeArg);
compileScript(jsWrapperPrefix + script + JS_WRAPPER_SUFFIX);
}
private static CompiledScript compileScript(String script) {
ScriptEngine engine = factory.getScriptEngine(new String[]{"--no-java"});
Compilable compEngine = (Compilable) engine;
private static void compileScript(String script) {
try {
return compEngine.compile(script);
engine.eval(script);
} catch (ScriptException e) {
log.warn("Failed to compile JS script: {}", e.getMessage(), e);
throw new IllegalArgumentException("Can't compile script: " + e.getMessage());
}
}
public static Bindings bindMsg(TbMsg msg) {
private static String[] prepareArgs(TbMsg msg) {
try {
Bindings bindings = new SimpleBindings();
if (ArrayUtils.isNotEmpty(msg.getData())) {
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(msg.getData());
Map map = mapper.treeToValue(jsonNode, Map.class);
bindings.put(DATA, map);
String[] args = new String[3];
if (msg.getData() != null) {
args[0] = msg.getData();
} else {
bindings.put(DATA, Collections.emptyMap());
args[0] = "";
}
bindings.put(METADATA, msg.getMetaData().getData());
return bindings;
args[1] = mapper.writeValueAsString(msg.getMetaData().getData());
args[2] = msg.getType();
return args;
} catch (Throwable th) {
throw new IllegalArgumentException("Cannot bind js args", th);
}
}
private static TbMsg unbindMsg(Bindings bindings, TbMsg msg) throws JsonProcessingException {
for (Map.Entry<String, String> entry : msg.getMetaData().getData().entrySet()) {
Object obj = entry.getValue();
entry.setValue(obj.toString());
}
Object payload = bindings.get(DATA);
if (payload != null) {
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(payload);
return new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), bytes);
}
return msg;
}
public TbMsg executeUpdate(Bindings bindings, TbMsg msg) throws ScriptException {
private static TbMsg unbindMsg(JsonNode msgData, TbMsg msg) {
try {
engine.eval(bindings);
return unbindMsg(bindings, msg);
String data = null;
Map<String, String> metadata = null;
String messageType = null;
if (msgData.has(MSG)) {
JsonNode msgPayload = msgData.get(MSG);
data = mapper.writeValueAsString(msgPayload);
}
if (msgData.has(METADATA)) {
JsonNode msgMetadata = msgData.get(METADATA);
metadata = mapper.convertValue(msgMetadata, new TypeReference<Map<String, String>>() {
});
}
if (msgData.has(MSG_TYPE)) {
messageType = msgData.get(MSG_TYPE).asText();
}
String newData = data != null ? data : msg.getData();
TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData();
String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData);
} catch (Throwable th) {
th.printStackTrace();
throw new IllegalArgumentException("Cannot unbind js args", th);
throw new RuntimeException("Failed to unbind message data from javascript result", th);
}
}
public boolean executeFilter(Bindings bindings) throws ScriptException {
Object eval = engine.eval(bindings);
if (eval instanceof Boolean) {
return (boolean) eval;
} else {
log.warn("Wrong result type: {}", eval);
throw new ScriptException("Wrong result type: " + eval);
public TbMsg executeUpdate(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (!result.isObject()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return unbindMsg(result, msg);
}
public Set<String> executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException {
Object eval = this.engine.eval(bindings);
if (eval instanceof String) {
return Collections.singleton((String) eval);
} else if (eval instanceof ScriptObjectMirror) {
ScriptObjectMirror mir = (ScriptObjectMirror) eval;
if (mir.isArray()) {
public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException {
JsonNode result = executeScript(prevMsg);
if (!result.isObject()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return unbindMsg(result, prevMsg);
}
public boolean executeFilter(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (!result.isBoolean()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return result.asBoolean();
}
public Set<String> executeSwitch(TbMsg msg) throws ScriptException, NoSuchMethodException {
JsonNode result = executeScript(msg);
if (result.isTextual()) {
return Collections.singleton(result.asText());
} else if (result.isArray()) {
Set<String> nextStates = Sets.newHashSet();
for (Map.Entry<String, Object> entry : mir.entrySet()) {
if (entry.getValue() instanceof String) {
nextStates.add((String) entry.getValue());
for (JsonNode val : result) {
if (!val.isTextual()) {
log.warn("Wrong result type: {}", val.getNodeType());
throw new ScriptException("Wrong result type: " + val.getNodeType());
} else {
log.warn("Wrong result type: {}", eval);
throw new ScriptException("Wrong result type: " + eval);
nextStates.add(val.asText());
}
}
return nextStates;
} else {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
}
log.warn("Wrong result type: {}", eval);
throw new ScriptException("Wrong result type: " + eval);
private JsonNode executeScript(TbMsg msg) throws ScriptException {
try {
String[] inArgs = prepareArgs(msg);
String eval = ((Invocable)engine).invokeFunction(this.invokeFunctionName, inArgs[0], inArgs[1], inArgs[2]).toString();
return mapper.readTree(eval);
} catch (ScriptException | IllegalArgumentException th) {
throw th;
} catch (Throwable th) {
th.printStackTrace();
throw new RuntimeException("Failed to execute js script", th);
}
}
public void destroy() {
engine = null;
//engine = null;
}
}

View File

@ -64,7 +64,7 @@ public class TbMsgTelemetryNode implements TbNode {
return;
}
String src = new String(msg.getData(), StandardCharsets.UTF_8);
String src = msg.getData();
TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src));
Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData();
if (tsKvMap == null) {

View File

@ -28,10 +28,14 @@ import javax.script.Bindings;
type = ComponentType.TRANSFORMATION,
name = "script",
configClazz = TbTransformMsgNodeConfiguration.class,
nodeDescription = "Change Message payload and Metadata using JavaScript",
nodeDetails = "JavaScript function recieve 2 input parameters that can be changed inside.<br/> " +
nodeDescription = "Change Message payload, Metadata or Message type using JavaScript",
nodeDetails = "JavaScript function receive 3 input parameters.<br/> " +
"<code>metadata</code> - is a Message metadata.<br/>" +
"<code>msg</code> - is a Message payload.<br/>Any properties can be changed/removed/added in those objects.",
"<code>msg</code> - is a Message payload.<br/>" +
"<code>msgType</code> - is a Message type.<br/>" +
"Should return the following structure:<br/>" +
"<code>{ msg: <new payload>, metadata: <new metadata>, msgType: <new msgType> }</code>" +
"All fields in resulting object are optional and will be taken from original message if not specified.",
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbTransformationNodeScriptConfig")
public class TbTransformMsgNode extends TbAbstractTransformNode {
@ -48,11 +52,7 @@ public class TbTransformMsgNode extends TbAbstractTransformNode {
@Override
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(toBindings(msg), msg));
}
private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg);
return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(msg));
}
@Override

View File

@ -27,7 +27,7 @@ public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguratio
public TbTransformMsgNodeConfiguration defaultConfiguration() {
TbTransformMsgNodeConfiguration configuration = new TbTransformMsgNodeConfiguration();
configuration.setStartNewChain(false);
configuration.setJsScript("return msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine' ");
configuration.setJsScript("return {msg: msg, metadata: metadata, msgType: msgType};");
return configuration;
}
}

View File

@ -52,7 +52,7 @@ public class TbJsFilterNodeTest {
@Test
public void falseEvaluationDoNotSendMsg() throws TbNodeException {
initWithScript("return 10 > 15;");
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}");
mockJsExecutor();
@ -65,7 +65,7 @@ public class TbJsFilterNodeTest {
@Test
public void notValidMsgDataThrowsException() throws TbNodeException {
initWithScript("return 10 > 15;");
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), new byte[4]);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, null, "{}");
when(ctx.getJsExecutor()).thenReturn(executor);
@ -79,11 +79,11 @@ public class TbJsFilterNodeTest {
public void exceptionInJsThrowsException() throws TbNodeException {
initWithScript("return metadata.temp.curr < 15;");
TbMsgMetaData metaData = new TbMsgMetaData();
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}");
mockJsExecutor();
node.onMsg(ctx, msg);
String expectedMessage = "TypeError: Cannot get property \"curr\" of null in <eval> at line number 1";
String expectedMessage = "TypeError: Cannot read property \"curr\" from undefined in <eval> at line number 1";
verifyError(msg, expectedMessage, ScriptException.class);
}
@ -98,7 +98,7 @@ public class TbJsFilterNodeTest {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
metaData.putValue("humidity", "99");
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}");
mockJsExecutor();
node.onMsg(ctx, msg);
@ -113,7 +113,7 @@ public class TbJsFilterNodeTest {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
metaData.putValue("humidity", "99");
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}");
mockJsExecutor();
node.onMsg(ctx, msg);
@ -129,7 +129,7 @@ public class TbJsFilterNodeTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
mockJsExecutor();
node.onMsg(ctx, msg);

View File

@ -68,7 +68,7 @@ public class TbJsSwitchNodeTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
mockJsExecutor();
node.onMsg(ctx, msg);
@ -92,7 +92,7 @@ public class TbJsSwitchNodeTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
mockJsExecutor();
node.onMsg(ctx, msg);

View File

@ -98,7 +98,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@ -123,7 +123,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@ -148,7 +148,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
@ -166,7 +166,7 @@ public class TbGetCustomerAttributeNodeTest {
@Test
public void customerAttributeAddedInMetadata() {
CustomerId customerId = new CustomerId(UUIDs.timeBased());
msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), "{}");
entityAttributeFetched(customerId);
}
@ -177,7 +177,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@ -192,7 +192,7 @@ public class TbGetCustomerAttributeNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), "{}");
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@ -207,7 +207,7 @@ public class TbGetCustomerAttributeNodeTest {
Device device = new Device();
device.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}");
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
@ -234,7 +234,7 @@ public class TbGetCustomerAttributeNodeTest {
Device device = new Device();
device.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}");
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));

View File

@ -57,7 +57,7 @@ public class TbChangeOriginatorNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@ -78,7 +78,7 @@ public class TbChangeOriginatorNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@ -99,7 +99,7 @@ public class TbChangeOriginatorNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong")));

View File

@ -51,13 +51,13 @@ public class TbTransformMsgNodeTest {
@Test
public void metadataCanBeUpdated() throws TbNodeException {
initWithScript("return metadata.temp = metadata.temp * 10;");
initWithScript("metadata.temp = metadata.temp * 10; return {metadata: metadata};");
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
mockJsExecutor();
node.onMsg(ctx, msg);
@ -65,18 +65,18 @@ public class TbTransformMsgNodeTest {
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellNext(captor.capture());
TbMsg actualMsg = captor.getValue();
assertEquals("70.0", actualMsg.getMetaData().getValue("temp"));
assertEquals("70", actualMsg.getMetaData().getValue("temp"));
}
@Test
public void metadataCanBeAdded() throws TbNodeException {
initWithScript("return metadata.newAttr = metadata.humidity - msg.passed;");
initWithScript("metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};");
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
mockJsExecutor();
node.onMsg(ctx, msg);
@ -84,18 +84,18 @@ public class TbTransformMsgNodeTest {
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellNext(captor.capture());
TbMsg actualMsg = captor.getValue();
assertEquals("94.0", actualMsg.getMetaData().getValue("newAttr"));
assertEquals("94", actualMsg.getMetaData().getValue("newAttr"));
}
@Test
public void payloadCanBeUpdated() throws TbNodeException {
initWithScript("msg.passed = msg.passed * metadata.temp; return msg.bigObj.newProp = 'Ukraine' ");
initWithScript("msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};");
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
mockJsExecutor();
node.onMsg(ctx, msg);
@ -103,7 +103,7 @@ public class TbTransformMsgNodeTest {
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellNext(captor.capture());
TbMsg actualMsg = captor.getValue();
String expectedJson = "{\"name\":\"Vit\",\"passed\":35.0,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}";
String expectedJson = "{\"name\":\"Vit\",\"passed\":35,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}";
assertEquals(expectedJson, new String(actualMsg.getData()));
}