From f51be6b2122bf2241e4577f7a18bd3a6a456dd38 Mon Sep 17 00:00:00 2001 From: vparomskiy Date: Tue, 20 Mar 2018 11:29:47 +0200 Subject: [PATCH] add filter & switch Nodes. add self get latest Telemetry. --- .../rule/engine/api/ListeningExecutor.java | 12 ++ .../rule/engine/api/TbContext.java | 2 + .../rule/engine/filter/TbJsFilterNode.java | 51 ++++++ .../filter/TbJsFilterNodeConfiguration.java | 9 + .../rule/engine/filter/TbJsSwitchNode.java | 56 +++++++ .../filter/TbJsSwitchNodeConfiguration.java | 12 ++ .../rule/engine/js/JsExecutorService.java | 30 ++++ .../rule/engine/js/NashornJsEngine.java | 79 +++++++++ .../engine/metadata/TbGetAttributesNode.java | 45 +++-- .../TbGetAttributesNodeConfiguration.java | 2 + .../engine/filter/TbJsFilterNodeTest.java | 154 ++++++++++++++++++ .../engine/filter/TbJsSwitchNodeTest.java | 118 ++++++++++++++ 12 files changed, 557 insertions(+), 13 deletions(-) create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java new file mode 100644 index 0000000000..a233cb36fc --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java @@ -0,0 +1,12 @@ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.Callable; + +public interface ListeningExecutor { + + ListenableFuture executeAsync(Callable task); + + void onDestroy(); +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 541d35b899..c7e2bec28c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -74,4 +74,6 @@ public interface TbContext { RelationService getRelationService(); + ListeningExecutor getJsExecutor(); + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java new file mode 100644 index 0000000000..476695eb20 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java @@ -0,0 +1,51 @@ +package org.thingsboard.rule.engine.filter; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.js.NashornJsEngine; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.Bindings; + +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; + +@Slf4j +public class TbJsFilterNode implements TbNode { + + private TbJsFilterNodeConfiguration config; + private NashornJsEngine jsEngine; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbJsFilterNodeConfiguration.class); + this.jsEngine = new NashornJsEngine(config.getJsScript()); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + ListeningExecutor jsExecutor = ctx.getJsExecutor(); + withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))), + result -> processFilter(ctx, msg, result), + t -> ctx.tellError(msg, t)); + } + + private void processFilter(TbContext ctx, TbMsg msg, Boolean filterResult) { + if (filterResult) { + ctx.tellNext(msg); + } else { + log.debug("Msg filtered out {}", msg.getId()); + } + } + + private Bindings toBindings(TbMsg msg) { + return NashornJsEngine.bindMsg(msg); + } + + @Override + public void destroy() { + if (jsEngine != null) { + jsEngine.destroy(); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java new file mode 100644 index 0000000000..488fa444d1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java @@ -0,0 +1,9 @@ +package org.thingsboard.rule.engine.filter; + +import lombok.Data; + +@Data +public class TbJsFilterNodeConfiguration { + + private String jsScript; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java new file mode 100644 index 0000000000..5d1514a325 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java @@ -0,0 +1,56 @@ +package org.thingsboard.rule.engine.filter; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.js.NashornJsEngine; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.Bindings; + +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; + +@Slf4j +public class TbJsSwitchNode implements TbNode { + + private TbJsSwitchNodeConfiguration config; + private NashornJsEngine jsEngine; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class); + this.jsEngine = new NashornJsEngine(config.getJsScript()); + if (config.getAllowedRelations().size() < 1) { + String message = "Switch node should have at least 1 relation"; + log.error(message); + throw new IllegalStateException(message); + } + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + ListeningExecutor jsExecutor = ctx.getJsExecutor(); + withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))), + result -> processSwitch(ctx, msg, result), + t -> ctx.tellError(msg, t)); + } + + private void processSwitch(TbContext ctx, TbMsg msg, String nextRelation) { + if (config.getAllowedRelations().contains(nextRelation)) { + ctx.tellNext(msg, nextRelation); + } else { + ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelation)); + } + } + + private Bindings toBindings(TbMsg msg) { + return NashornJsEngine.bindMsg(msg); + } + + @Override + public void destroy() { + if (jsEngine != null) { + jsEngine.destroy(); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java new file mode 100644 index 0000000000..a179c96a1f --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java @@ -0,0 +1,12 @@ +package org.thingsboard.rule.engine.filter; + +import lombok.Data; + +import java.util.Set; + +@Data +public class TbJsSwitchNodeConfiguration { + + private String jsScript; + private Set allowedRelations; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java new file mode 100644 index 0000000000..899188551f --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java @@ -0,0 +1,30 @@ +package org.thingsboard.rule.engine.js; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.thingsboard.rule.engine.api.ListeningExecutor; + +import javax.annotation.PreDestroy; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; + +public class JsExecutorService implements ListeningExecutor{ + + private final ListeningExecutorService service; + + public JsExecutorService(int threadCount) { + this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount)); + } + + @Override + public ListenableFuture executeAsync(Callable task) { + return service.submit(task); + } + + @PreDestroy + @Override + public void onDestroy() { + service.shutdown(); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java new file mode 100644 index 0000000000..c64cf7202f --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java @@ -0,0 +1,79 @@ +package org.thingsboard.rule.engine.js; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import jdk.nashorn.api.scripting.NashornScriptEngineFactory; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.*; +import java.util.Map; + + +@Slf4j +public class NashornJsEngine { + + public static final String METADATA = "meta"; + public static final String DATA = "msg"; + private static NashornScriptEngineFactory factory = new NashornScriptEngineFactory(); + + private CompiledScript engine; + + public NashornJsEngine(String script) { + engine = compileScript(script); + } + + private static CompiledScript compileScript(String script) { + ScriptEngine engine = factory.getScriptEngine(new String[]{"--no-java"}); + Compilable compEngine = (Compilable) engine; + try { + return compEngine.compile(script); + } catch (ScriptException e) { + log.warn("Failed to compile JS script: {}", e.getMessage(), e); + throw new IllegalArgumentException("Can't compile script: " + e.getMessage()); + } + } + + public static Bindings bindMsg(TbMsg msg) { + try { + Bindings bindings = new SimpleBindings(); + bindings.put(METADATA, msg.getMetaData().getData()); + + if (ArrayUtils.isNotEmpty(msg.getData())) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(msg.getData()); + Map map = mapper.treeToValue(jsonNode, Map.class); + bindings.put(DATA, map); + } + + return bindings; + } catch (Throwable th) { + throw new IllegalArgumentException("Cannot bind js args", th); + } + } + + public boolean executeFilter(Bindings bindings) throws ScriptException { + Object eval = engine.eval(bindings); + if (eval instanceof Boolean) { + return (boolean) eval; + } else { + log.warn("Wrong result type: {}", eval); + throw new ScriptException("Wrong result type: " + eval); + } + } + + public String executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException { + Object eval = this.engine.eval(bindings); + if (eval instanceof String) { + return (String) eval; + } else { + log.warn("Wrong result type: {}", eval); + throw new ScriptException("Wrong result type: " + eval); + } + } + + public void destroy() { + engine = null; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java index 4d41921740..bc6bc4a03c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,13 +15,16 @@ */ package org.thingsboard.rule.engine.metadata; -import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.api.*; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.TbMsg; import java.util.List; @@ -44,24 +47,40 @@ public class TbGetAttributesNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException { - ListenableFuture> future = Futures.allAsList( - putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."), - putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."), - putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss.")); + if (CollectionUtils.isNotEmpty(config.getLatestTsKeyNames())) { + withCallback(getLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()), + i -> ctx.tellNext(msg), + t -> ctx.tellError(msg, t)); + } else { + ListenableFuture> future = Futures.allAsList( + putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."), + putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."), + putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss.")); - withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t)); + withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t)); + } } - private ListenableFuture putAttributesAsync(TbMsg msg, List attributes, String prefix) { + private ListenableFuture putAttr(TbMsg msg, List attributes, String prefix) { attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString())); return Futures.immediateFuture(null); } private ListenableFuture putAttrAsync(TbContext ctx, TbMsg msg, String scope, List attributes, String prefix) { - return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes), - (AsyncFunction, Void>) i -> putAttributesAsync(msg, i, prefix)); + ListenableFuture> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, attributes); + return Futures.transform(latest, (Function, Void>) l -> { + l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString())); + return null; + }); } + private ListenableFuture getLatestTelemetry(TbContext ctx, TbMsg msg, List attributes) { + ListenableFuture> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), attributes); + return Futures.transform(latest, (Function, Void>) l -> { + l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString())); + return null; + }); + } @Override public void destroy() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java index b54edef818..ad92314324 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java @@ -29,4 +29,6 @@ public class TbGetAttributesNodeConfiguration { private List sharedAttributeNames; private List serverAttributeNames; + private List latestTsKeyNames; + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java new file mode 100644 index 0000000000..6a4bec2d66 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java @@ -0,0 +1,154 @@ +package org.thingsboard.rule.engine.filter; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import javax.script.ScriptException; +import java.util.concurrent.Callable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class TbJsFilterNodeTest { + + private TbJsFilterNode node; + + @Mock + private TbContext ctx; + @Mock + private ListeningExecutor executor; + + @Test + public void falseEvaluationDoNotSendMsg() throws TbNodeException { + initWithScript("10 > 15;"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes()); + + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verifyNoMoreInteractions(ctx); + } + + @Test + public void notValidMsgDataThrowsException() throws TbNodeException { + initWithScript("10 > 15;"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), new byte[4]); + + when(ctx.getJsExecutor()).thenReturn(executor); + + mockJsExecutor(); + + node.onMsg(ctx, msg); + verifyError(msg, "Cannot bind js args", IllegalArgumentException.class); + } + + @Test + public void exceptionInJsThrowsException() throws TbNodeException { + initWithScript("meta.temp.curr < 15;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + String expectedMessage = "TypeError: Cannot get property \"curr\" of null in at line number 1"; + verifyError(msg, expectedMessage, ScriptException.class); + } + + @Test(expected = IllegalArgumentException.class) + public void notValidScriptThrowsException() throws TbNodeException { + initWithScript("10 > 15 asdq out"); + } + + @Test + public void metadataConditionCanBeFalse() throws TbNodeException { + initWithScript("meta.humidity < 15;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verifyNoMoreInteractions(ctx); + } + + @Test + public void metadataConditionCanBeTrue() throws TbNodeException { + initWithScript("meta.temp < 15;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg); + } + + @Test + public void msgJsonParsedAndBinded() throws TbNodeException { + initWithScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg); + } + + private void initWithScript(String script) throws TbNodeException { + TbJsFilterNodeConfiguration config = new TbJsFilterNodeConfiguration(); + config.setJsScript(script); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); + nodeConfiguration.setData(mapper.valueToTree(config)); + + node = new TbJsFilterNode(); + node.init(nodeConfiguration, null); + } + + private void mockJsExecutor() { + when(ctx.getJsExecutor()).thenReturn(executor); + doAnswer((Answer>) invocationOnMock -> { + try { + Callable task = (Callable) (invocationOnMock.getArguments())[0]; + return Futures.immediateFuture((Boolean) task.call()); + } catch (Throwable th) { + return Futures.immediateFailedFuture(th); + } + }).when(executor).executeAsync(Matchers.any(Callable.class)); + } + + private void verifyError(TbMsg msg, String message, Class expectedClass) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals(expectedClass, value.getClass()); + assertEquals(message, value.getMessage()); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java new file mode 100644 index 0000000000..6ffa83bcc4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java @@ -0,0 +1,118 @@ +package org.thingsboard.rule.engine.filter; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.Set; +import java.util.concurrent.Callable; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TbJsSwitchNodeTest { + + private TbJsSwitchNode node; + + @Mock + private TbContext ctx; + @Mock + private ListeningExecutor executor; + + @Test + public void allowedRelationPassed() throws TbNodeException { + String jsCode = "function nextRelation(meta, msg) {\n" + + " if(msg.passed == 5 && meta.temp == 10)\n" + + " return 'one'\n" + + " else\n" + + " return 'two';\n" + + "};\n" + + "\n" + + "nextRelation(meta, msg);"; + initWithScript(jsCode, Sets.newHashSet("one", "two")); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, "one"); + } + + @Test + public void unknownRelationThrowsException() throws TbNodeException { + String jsCode = "function nextRelation(meta, msg) {\n" + + " return 'nine';" + + "};\n" + + "\n" + + "nextRelation(meta, msg);"; + initWithScript(jsCode, Sets.newHashSet("one", "two")); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verifyError(msg, "Unsupported relation for switch nine", IllegalStateException.class); + } + + private void initWithScript(String script, Set relations) throws TbNodeException { + TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration(); + config.setJsScript(script); + config.setAllowedRelations(relations); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); + nodeConfiguration.setData(mapper.valueToTree(config)); + + node = new TbJsSwitchNode(); + node.init(nodeConfiguration, null); + } + + private void mockJsExecutor() { + when(ctx.getJsExecutor()).thenReturn(executor); + doAnswer((Answer>) invocationOnMock -> { + try { + Callable task = (Callable) (invocationOnMock.getArguments())[0]; + return Futures.immediateFuture((String) task.call()); + } catch (Throwable th) { + return Futures.immediateFailedFuture(th); + } + }).when(executor).executeAsync(Matchers.any(Callable.class)); + } + + private void verifyError(TbMsg msg, String message, Class expectedClass) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals(expectedClass, value.getClass()); + assertEquals(message, value.getMessage()); + } +} \ No newline at end of file