diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index bbe7771e61..a5a20b83fa 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,6 +29,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.Base64Utils; +import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.js.JsExecutorService; import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Event; @@ -328,4 +330,9 @@ public class ActorSystemContext { return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); } + public ListeningExecutor getExecutor() { + //TODO: take thread count from yml. + return new JsExecutorService(1); + } + } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a1344aa8d4..012a09f84e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors.ruleChain; +import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.msg.TbMsg; @@ -30,6 +31,8 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import java.util.Set; + /** * Created by ashvayka on 19.03.18. */ @@ -45,7 +48,7 @@ class DefaultTbContext implements TbContext { @Override public void tellNext(TbMsg msg) { - tellNext(msg, null); + tellNext(msg, (String) null); } @Override @@ -89,6 +92,16 @@ class DefaultTbContext implements TbContext { nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor()); } + @Override + public void tellNext(TbMsg msg, Set relationTypes) { + relationTypes.forEach(type -> tellNext(msg, type)); + } + + @Override + public ListeningExecutor getJsExecutor() { + return mainCtx.getExecutor(); + } + @Override public AttributesService getAttributesService() { return mainCtx.getAttributesService(); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 3f6bed77ac..7636152d2d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -153,7 +153,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 536437ae7b..f5c014e5c5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java index 9aa75745cf..7997024e32 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.controller; import com.fasterxml.jackson.core.type.TypeReference; diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 1f2f70997c..5b8216b026 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java index 74a2d3f9c7..eca153bdfe 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java index bfec3900a7..251a68901a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java new file mode 100644 index 0000000000..fecf019425 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.Callable; + +public interface ListeningExecutor { + + ListenableFuture executeAsync(Callable task); + + void onDestroy(); +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index a105d20ada..260a51ff17 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -29,6 +29,7 @@ import org.thingsboard.server.dao.rule.RuleService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import java.util.Set; import java.util.UUID; /** @@ -40,6 +41,8 @@ public interface TbContext { void tellNext(TbMsg msg, String relationType); + void tellNext(TbMsg msg, Set relationTypes); + void tellSelf(TbMsg msg, long delayMs); void tellOthers(TbMsg msg); @@ -72,4 +75,6 @@ public interface TbContext { RelationService getRelationService(); + ListeningExecutor getJsExecutor(); + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java new file mode 100644 index 0000000000..089c9b1919 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.filter; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.js.NashornJsEngine; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.Bindings; + +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; + +@Slf4j +public class TbJsFilterNode implements TbNode { + + private TbJsFilterNodeConfiguration config; + private NashornJsEngine jsEngine; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbJsFilterNodeConfiguration.class); + this.jsEngine = new NashornJsEngine(config.getJsScript()); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + ListeningExecutor jsExecutor = ctx.getJsExecutor(); + withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))), + filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)), + t -> ctx.tellError(msg, t)); + } + + private Bindings toBindings(TbMsg msg) { + return NashornJsEngine.bindMsg(msg); + } + + @Override + public void destroy() { + if (jsEngine != null) { + jsEngine.destroy(); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java new file mode 100644 index 0000000000..bf543e3276 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.filter; + +import lombok.Data; + +@Data +public class TbJsFilterNodeConfiguration { + + private String jsScript; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java new file mode 100644 index 0000000000..afd81636a0 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java @@ -0,0 +1,82 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.filter; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.js.NashornJsEngine; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.Bindings; +import java.util.Set; + +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; + +@Slf4j +public class TbJsSwitchNode implements TbNode { + + private TbJsSwitchNodeConfiguration config; + private NashornJsEngine jsEngine; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class); + if (config.getAllowedRelations().size() < 1) { + String message = "Switch node should have at least 1 relation"; + log.error(message); + throw new IllegalStateException(message); + } + if (!config.isRouteToAllWithNoCheck()) { + this.jsEngine = new NashornJsEngine(config.getJsScript()); + } + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + if (config.isRouteToAllWithNoCheck()) { + ctx.tellNext(msg, config.getAllowedRelations()); + return; + } + ListeningExecutor jsExecutor = ctx.getJsExecutor(); + withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))), + result -> processSwitch(ctx, msg, result), + t -> ctx.tellError(msg, t)); + } + + private void processSwitch(TbContext ctx, TbMsg msg, Set nextRelations) { + if (validateRelations(nextRelations)) { + ctx.tellNext(msg, nextRelations); + } else { + ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelations)); + } + } + + private boolean validateRelations(Set nextRelations) { + return config.getAllowedRelations().containsAll(nextRelations); + } + + private Bindings toBindings(TbMsg msg) { + return NashornJsEngine.bindMsg(msg); + } + + @Override + public void destroy() { + if (jsEngine != null) { + jsEngine.destroy(); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java new file mode 100644 index 0000000000..331302d542 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.filter; + +import lombok.Data; + +import java.util.Set; + +@Data +public class TbJsSwitchNodeConfiguration { + + private String jsScript; + private Set allowedRelations; + private boolean routeToAllWithNoCheck; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java new file mode 100644 index 0000000000..19c2bb9405 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.js; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.thingsboard.rule.engine.api.ListeningExecutor; + +import javax.annotation.PreDestroy; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; + +public class JsExecutorService implements ListeningExecutor{ + + private final ListeningExecutorService service; + + public JsExecutorService(int threadCount) { + this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount)); + } + + @Override + public ListenableFuture executeAsync(Callable task) { + return service.submit(task); + } + + @PreDestroy + @Override + public void onDestroy() { + service.shutdown(); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java new file mode 100644 index 0000000000..082535f0fe --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java @@ -0,0 +1,139 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.js; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import jdk.nashorn.api.scripting.NashornScriptEngineFactory; +import jdk.nashorn.api.scripting.ScriptObjectMirror; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.*; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + + +@Slf4j +public class NashornJsEngine { + + public static final String METADATA = "meta"; + public static final String DATA = "msg"; + private static NashornScriptEngineFactory factory = new NashornScriptEngineFactory(); + + private CompiledScript engine; + + public NashornJsEngine(String script) { + engine = compileScript(script); + } + + private static CompiledScript compileScript(String script) { + ScriptEngine engine = factory.getScriptEngine(new String[]{"--no-java"}); + Compilable compEngine = (Compilable) engine; + try { + return compEngine.compile(script); + } catch (ScriptException e) { + log.warn("Failed to compile JS script: {}", e.getMessage(), e); + throw new IllegalArgumentException("Can't compile script: " + e.getMessage()); + } + } + + public static Bindings bindMsg(TbMsg msg) { + try { + Bindings bindings = new SimpleBindings(); + bindings.put(METADATA, msg.getMetaData().getData()); + + if (ArrayUtils.isNotEmpty(msg.getData())) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(msg.getData()); + Map map = mapper.treeToValue(jsonNode, Map.class); + bindings.put(DATA, map); + } + + return bindings; + } catch (Throwable th) { + throw new IllegalArgumentException("Cannot bind js args", th); + } + } + + private static TbMsg unbindMsg(Bindings bindings, TbMsg msg) throws JsonProcessingException { + for (Map.Entry entry : msg.getMetaData().getData().entrySet()) { + Object obj = entry.getValue(); + entry.setValue(obj.toString()); + } + + Object payload = bindings.get(DATA); + if (payload != null) { + ObjectMapper mapper = new ObjectMapper(); + byte[] bytes = mapper.writeValueAsBytes(payload); + return new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), bytes); + } + + return msg; + } + + public TbMsg executeUpdate(Bindings bindings, TbMsg msg) throws ScriptException { + try { + engine.eval(bindings); + return unbindMsg(bindings, msg); + } catch (Throwable th) { + th.printStackTrace(); + throw new IllegalArgumentException("Cannot unbind js args", th); + } + } + + public boolean executeFilter(Bindings bindings) throws ScriptException { + Object eval = engine.eval(bindings); + if (eval instanceof Boolean) { + return (boolean) eval; + } else { + log.warn("Wrong result type: {}", eval); + throw new ScriptException("Wrong result type: " + eval); + } + } + + public Set executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException { + Object eval = this.engine.eval(bindings); + if (eval instanceof String) { + return Collections.singleton((String) eval); + } else if (eval instanceof ScriptObjectMirror) { + ScriptObjectMirror mir = (ScriptObjectMirror) eval; + if (mir.isArray()) { + Set nextStates = Sets.newHashSet(); + for (Map.Entry entry : mir.entrySet()) { + if (entry.getValue() instanceof String) { + nextStates.add((String) entry.getValue()); + } else { + log.warn("Wrong result type: {}", eval); + throw new ScriptException("Wrong result type: " + eval); + } + } + return nextStates; + } + } + + log.warn("Wrong result type: {}", eval); + throw new ScriptException("Wrong result type: " + eval); + } + + public void destroy() { + engine = null; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java index 52850be9a6..269e40fc19 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java @@ -70,7 +70,7 @@ public abstract class TbEntityGetAttrNode implements TbNode } - private void putAttributesAndTell(TbContext ctx, TbMsg msg, List attributes) { + private void putAttributesAndTell(TbContext ctx, TbMsg msg, List attributes) { attributes.forEach(r -> { String attrName = config.getAttrMapping().get(r.getKey()); msg.getMetaData().putValue(attrName, r.getValueAsString()); @@ -85,4 +85,8 @@ public abstract class TbEntityGetAttrNode implements TbNode protected abstract ListenableFuture findEntityAsync(TbContext ctx, EntityId originator); + public void setConfig(TbGetEntityAttrNodeConfiguration config) { + this.config = config; + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java index ac12f95eab..90eadcb2c7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,13 +16,19 @@ package org.thingsboard.rule.engine.metadata; import com.google.common.base.Function; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.thingsboard.rule.engine.TbNodeUtils; -import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.EnrichmentNode; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.TbMsg; import java.util.List; @@ -46,30 +52,41 @@ public class TbGetAttributesNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException { - ListenableFuture> future = Futures.allAsList( - putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."), - putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."), - putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss.")); - - withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t)); - } - - private Void putAttributesAsync(TbMsg msg, List attributes, String prefix) { - if (attributes != null) { - attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString())); - } - return null; - } - - private ListenableFuture putAttrAsync(TbContext ctx, TbMsg msg, String scope, List attributes, String prefix) { - if (attributes != null) { - return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes), - (Function, Void>) i -> putAttributesAsync(msg, i, prefix)); + if (CollectionUtils.isNotEmpty(config.getLatestTsKeyNames())) { + withCallback(getLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()), + i -> ctx.tellNext(msg), + t -> ctx.tellError(msg, t)); } else { + ListenableFuture> future = Futures.allAsList( + putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."), + putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."), + putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss.")); + + withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t)); + } + } + + private ListenableFuture putAttrAsync(TbContext ctx, TbMsg msg, String scope, List keys, String prefix) { + if (keys == null) { return Futures.immediateFuture(null); } + ListenableFuture> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, keys); + return Futures.transform(latest, (Function, Void>) l -> { + l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString())); + return null; + }); } + private ListenableFuture getLatestTelemetry(TbContext ctx, TbMsg msg, List keys) { + if (keys == null) { + return Futures.immediateFuture(null); + } + ListenableFuture> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), keys); + return Futures.transform(latest, (Function, Void>) l -> { + l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString())); + return null; + }); + } @Override public void destroy() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java index b54edef818..ad92314324 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java @@ -29,4 +29,6 @@ public class TbGetAttributesNodeConfiguration { private List sharedAttributeNames; private List serverAttributeNames; + private List latestTsKeyNames; + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java index 2e3d6172fa..18ddfcf6b9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java @@ -15,39 +15,19 @@ */ package org.thingsboard.rule.engine.metadata; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.EnrichmentNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.HasCustomerId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; @EnrichmentNode(name="Get Customer Attributes Node") public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode { @Override protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { - - switch (originator.getEntityType()) { - case CUSTOMER: - return Futures.immediateFuture((CustomerId) originator); - case USER: - return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); - case ASSET: - return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); - case DEVICE: - return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); - default: - return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); - } - } - - private ListenableFuture getCustomerAsync(ListenableFuture future) { - return Futures.transform(future, (AsyncFunction) in -> { - return in != null ? Futures.immediateFuture(in.getCustomerId()) - : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));}); + return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, originator); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java index 75b0a6524b..ae0b662d48 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java @@ -19,7 +19,7 @@ import lombok.Data; import org.thingsboard.server.common.data.relation.EntitySearchDirection; @Data -public class TbGetRelatedAttrNodeConfiguration { +public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration { private String relationType; private EntitySearchDirection direction; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java index 83aa4e2c17..3a0dce85a2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java @@ -15,20 +15,16 @@ */ package org.thingsboard.rule.engine.metadata; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.commons.collections.CollectionUtils; import org.thingsboard.rule.engine.TbNodeUtils; -import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.rule.engine.api.EnrichmentNode; +import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader; + import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.relation.EntitySearchDirection; -import org.thingsboard.server.dao.relation.RelationService; - -import java.util.List; - -import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON; @EnrichmentNode(name="Get Related Entity Attributes Node") public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode { @@ -38,23 +34,11 @@ public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode { @Override public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class); + setConfig(config); } @Override protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { - RelationService relationService = ctx.getRelationService(); - if (config.getDirection() == EntitySearchDirection.FROM) { - ListenableFuture> asyncRelation = relationService.findByFromAndTypeAsync(originator, config.getRelationType(), COMMON); - return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) - : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); - } else if (config.getDirection() == EntitySearchDirection.TO) { - ListenableFuture> asyncRelation = relationService.findByToAndTypeAsync(originator, config.getRelationType(), COMMON); - return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) - : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); - } - - return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); + return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, originator, config.getDirection(), config.getRelationType()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java index a1b05162dc..e51c053ca2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java @@ -15,16 +15,13 @@ */ package org.thingsboard.rule.engine.metadata; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.EnrichmentNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.HasTenantId; -import org.thingsboard.server.common.data.alarm.AlarmId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; @Slf4j @EnrichmentNode(name="Get Tenant Attributes Node") @@ -32,33 +29,7 @@ public class TbGetTenantAttributeNode extends TbEntityGetAttrNode { @Override protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { - - switch (originator.getEntityType()) { - case TENANT: - return Futures.immediateFuture((TenantId) originator); - case CUSTOMER: - return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator)); - case USER: - return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); - case PLUGIN: - return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator)); - case ASSET: - return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); - case DEVICE: - return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); - case ALARM: - return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator)); - case RULE_CHAIN: - return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator)); - default: - return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); - } - } - - private ListenableFuture getTenantAsync(ListenableFuture future) { - return Futures.transform(future, (AsyncFunction) in -> { - return in != null ? Futures.immediateFuture(in.getTenantId()) - : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));}); + return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, originator); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java new file mode 100644 index 0000000000..426a3276a9 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.server.common.msg.TbMsg; + +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; + +/** + * Created by ashvayka on 19.01.18. + */ +@Slf4j +public abstract class TbAbstractTransformNode implements TbNode { + + private TbTransformNodeConfiguration config; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbTransformNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + withCallback(transform(ctx, msg), + m -> routeMsg(ctx, m), + t -> ctx.tellError(msg, t)); + } + + protected abstract ListenableFuture transform(TbContext ctx, TbMsg msg); + + private void routeMsg(TbContext ctx, TbMsg msg) { + if (config.isStartNewChain()) { + ctx.spawn(msg); + } else { + ctx.tellNext(msg); + } + } + + public void setConfig(TbTransformNodeConfiguration config) { + this.config = config; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java new file mode 100644 index 0000000000..2592af253e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.google.common.base.Function; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader; +import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader; +import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.HashSet; + +@Slf4j +public class TbChangeOriginatorNode extends TbAbstractTransformNode { + + protected static final String CUSTOMER_SOURCE = "CUSTOMER"; + protected static final String TENANT_SOURCE = "TENANT"; + protected static final String RELATED_SOURCE = "RELATED"; + + private TbChangeOriginatorNodeConfiguration config; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbChangeOriginatorNodeConfiguration.class); + validateConfig(config); + setConfig(config); + } + + @Override + protected ListenableFuture transform(TbContext ctx, TbMsg msg) { + ListenableFuture newOriginator = getNewOriginator(ctx, msg.getOriginator()); + return Futures.transform(newOriginator, (Function) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData())); + } + + private ListenableFuture getNewOriginator(TbContext ctx, EntityId original) { + switch (config.getOriginatorSource()) { + case CUSTOMER_SOURCE: + return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, original); + case TENANT_SOURCE: + return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, original); + case RELATED_SOURCE: + return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, original, config.getDirection(), config.getRelationType()); + default: + return Futures.immediateFailedFuture(new IllegalStateException("Unexpected originator source " + config.getOriginatorSource())); + } + } + + private void validateConfig(TbChangeOriginatorNodeConfiguration conf) { + HashSet knownSources = Sets.newHashSet(CUSTOMER_SOURCE, TENANT_SOURCE, RELATED_SOURCE); + if (!knownSources.contains(conf.getOriginatorSource())) { + log.error("Unsupported source [{}] for TbChangeOriginatorNode", conf.getOriginatorSource()); + throw new IllegalArgumentException("Unsupported source TbChangeOriginatorNode" + conf.getOriginatorSource()); + } + + if (conf.getOriginatorSource().equals(RELATED_SOURCE)) { + if (conf.getDirection() == null || StringUtils.isBlank(conf.getRelationType())) { + log.error("Related source for TbChangeOriginatorNode should have direction and relationType. Actual [{}] [{}]", + conf.getDirection(), conf.getRelationType()); + throw new IllegalArgumentException("Wrong config for RElated Source in TbChangeOriginatorNode" + conf.getOriginatorSource()); + } + } + + } + + @Override + public void destroy() { + + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java new file mode 100644 index 0000000000..cf036810c4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import lombok.Data; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; + +@Data +public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{ + + private String originatorSource; + private EntitySearchDirection direction; + private String relationType; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java new file mode 100644 index 0000000000..241fbfbf7e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.rule.engine.js.NashornJsEngine; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.Bindings; + +public class TbTransformMsgNode extends TbAbstractTransformNode { + + private TbTransformMsgNodeConfiguration config; + private NashornJsEngine jsEngine; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbTransformMsgNodeConfiguration.class); + this.jsEngine = new NashornJsEngine(config.getJsScript()); + setConfig(config); + } + + @Override + protected ListenableFuture transform(TbContext ctx, TbMsg msg) { + return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(toBindings(msg), msg)); + } + + private Bindings toBindings(TbMsg msg) { + return NashornJsEngine.bindMsg(msg); + } + + @Override + public void destroy() { + if (jsEngine != null) { + jsEngine.destroy(); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java new file mode 100644 index 0000000000..9cc926b54d --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import lombok.Data; + +@Data +public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration { + + private String jsScript; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java deleted file mode 100644 index d1a4adc6b6..0000000000 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.transform; - -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.TbNodeUtils; -import org.thingsboard.rule.engine.api.*; -import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; -import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.dao.attributes.AttributesService; - -import java.util.List; - -/** - * Created by ashvayka on 19.01.18. - */ -@Slf4j -public class TbTransformNode implements TbNode { - - TbGetAttributesNodeConfiguration config; - - @Override - public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { - this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class); - } - - @Override - public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException { - try { - //TODO: refactor this to work async and fetch attributes from cache. - AttributesService service = ctx.getAttributesService(); - - fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs."); - fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss."); - fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared."); - ctx.tellNext(msg); - } catch (Exception e) { - log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e); - throw new TbNodeException(e); - } - } - - private void fetchAttributes(TbMsg msg, AttributesService service, List attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException { - if (attributeNames != null && attributeNames.isEmpty()) { - List attributes = service.find(msg.getOriginator(), scope, attributeNames).get(); - attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString())); - } - } - - @Override - public void destroy() { - - } -} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java new file mode 100644 index 0000000000..d9f57806ae --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import lombok.Data; + +@Data +public class TbTransformNodeConfiguration { + + private boolean startNewChain = false; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java new file mode 100644 index 0000000000..67eb808795 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.HasCustomerId; +import org.thingsboard.server.common.data.id.*; + +public class EntitiesCustomerIdAsyncLoader { + + + public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { + + switch (original.getEntityType()) { + case CUSTOMER: + return Futures.immediateFuture((CustomerId) original); + case USER: + return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) original)); + case ASSET: + return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original)); + case DEVICE: + return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original)); + default: + return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original)); + } + } + + private static ListenableFuture getCustomerAsync(ListenableFuture future) { + return Futures.transform(future, (AsyncFunction) in -> { + return in != null ? Futures.immediateFuture(in.getCustomerId()) + : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));}); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java new file mode 100644 index 0000000000..ac69c5dd59 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.commons.collections.CollectionUtils; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; +import org.thingsboard.server.dao.relation.RelationService; + +import java.util.List; + +import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON; + +public class EntitiesRelatedEntityIdAsyncLoader { + + public static ListenableFuture findEntityAsync(TbContext ctx, EntityId originator, + EntitySearchDirection direction, String relationType) { + RelationService relationService = ctx.getRelationService(); + if (direction == EntitySearchDirection.FROM) { + ListenableFuture> asyncRelation = relationService.findByFromAndTypeAsync(originator, relationType, COMMON); + return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); + } else if (direction == EntitySearchDirection.TO) { + ListenableFuture> asyncRelation = relationService.findByToAndTypeAsync(originator, relationType, COMMON); + return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); + } + + return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java new file mode 100644 index 0000000000..5d2aaa81d1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.HasTenantId; +import org.thingsboard.server.common.data.alarm.AlarmId; +import org.thingsboard.server.common.data.id.*; + +public class EntitiesTenantIdAsyncLoader { + + public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { + + switch (original.getEntityType()) { + case TENANT: + return Futures.immediateFuture((TenantId) original); + case CUSTOMER: + return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) original)); + case USER: + return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) original)); + case PLUGIN: + return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) original)); + case ASSET: + return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original)); + case DEVICE: + return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original)); + case ALARM: + return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) original)); + case RULE_CHAIN: + return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) original)); + default: + return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original)); + } + } + + private static ListenableFuture getTenantAsync(ListenableFuture future) { + return Futures.transform(future, (AsyncFunction) in -> { + return in != null ? Futures.immediateFuture(in.getTenantId()) + : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));}); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java new file mode 100644 index 0000000000..96f7032e5e --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java @@ -0,0 +1,170 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.filter; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import javax.script.ScriptException; +import java.util.concurrent.Callable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class TbJsFilterNodeTest { + + private TbJsFilterNode node; + + @Mock + private TbContext ctx; + @Mock + private ListeningExecutor executor; + + @Test + public void falseEvaluationDoNotSendMsg() throws TbNodeException { + initWithScript("10 > 15;"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes()); + + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, "false"); + verifyNoMoreInteractions(ctx); + } + + @Test + public void notValidMsgDataThrowsException() throws TbNodeException { + initWithScript("10 > 15;"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), new byte[4]); + + when(ctx.getJsExecutor()).thenReturn(executor); + + mockJsExecutor(); + + node.onMsg(ctx, msg); + verifyError(msg, "Cannot bind js args", IllegalArgumentException.class); + } + + @Test + public void exceptionInJsThrowsException() throws TbNodeException { + initWithScript("meta.temp.curr < 15;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + String expectedMessage = "TypeError: Cannot get property \"curr\" of null in at line number 1"; + verifyError(msg, expectedMessage, ScriptException.class); + } + + @Test(expected = IllegalArgumentException.class) + public void notValidScriptThrowsException() throws TbNodeException { + initWithScript("10 > 15 asdq out"); + } + + @Test + public void metadataConditionCanBeFalse() throws TbNodeException { + initWithScript("meta.humidity < 15;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, "false"); + verifyNoMoreInteractions(ctx); + } + + @Test + public void metadataConditionCanBeTrue() throws TbNodeException { + initWithScript("meta.temp < 15;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, "true"); + } + + @Test + public void msgJsonParsedAndBinded() throws TbNodeException { + initWithScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, "true"); + } + + private void initWithScript(String script) throws TbNodeException { + TbJsFilterNodeConfiguration config = new TbJsFilterNodeConfiguration(); + config.setJsScript(script); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + node = new TbJsFilterNode(); + node.init(nodeConfiguration, null); + } + + private void mockJsExecutor() { + when(ctx.getJsExecutor()).thenReturn(executor); + doAnswer((Answer>) invocationOnMock -> { + try { + Callable task = (Callable) (invocationOnMock.getArguments())[0]; + return Futures.immediateFuture((Boolean) task.call()); + } catch (Throwable th) { + return Futures.immediateFailedFuture(th); + } + }).when(executor).executeAsync(Matchers.any(Callable.class)); + } + + private void verifyError(TbMsg msg, String message, Class expectedClass) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals(expectedClass, value.getClass()); + assertEquals(message, value.getMessage()); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java new file mode 100644 index 0000000000..e70d4e16f4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java @@ -0,0 +1,167 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.filter; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Callable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class TbJsSwitchNodeTest { + + private TbJsSwitchNode node; + + @Mock + private TbContext ctx; + @Mock + private ListeningExecutor executor; + + @Test + public void routeToAllDoNotEvaluatesJs() throws TbNodeException { + HashSet relations = Sets.newHashSet("one", "two"); + initWithScript("test qwerty", relations, true); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes()); + + node.onMsg(ctx, msg); + verify(ctx).tellNext(msg, relations); + verifyNoMoreInteractions(ctx, executor); + } + + @Test + public void multipleRoutesAreAllowed() throws TbNodeException { + String jsCode = "function nextRelation(meta, msg) {\n" + + " if(msg.passed == 5 && meta.temp == 10)\n" + + " return ['three', 'one']\n" + + " else\n" + + " return 'two';\n" + + "};\n" + + "\n" + + "nextRelation(meta, msg);"; + initWithScript(jsCode, Sets.newHashSet("one", "two", "three"), false); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, Sets.newHashSet("one", "three")); + } + + @Test + public void allowedRelationPassed() throws TbNodeException { + String jsCode = "function nextRelation(meta, msg) {\n" + + " if(msg.passed == 5 && meta.temp == 10)\n" + + " return 'one'\n" + + " else\n" + + " return 'two';\n" + + "};\n" + + "\n" + + "nextRelation(meta, msg);"; + initWithScript(jsCode, Sets.newHashSet("one", "two"), false); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, Sets.newHashSet("one")); + } + + @Test + public void unknownRelationThrowsException() throws TbNodeException { + String jsCode = "function nextRelation(meta, msg) {\n" + + " return ['one','nine'];" + + "};\n" + + "\n" + + "nextRelation(meta, msg);"; + initWithScript(jsCode, Sets.newHashSet("one", "two"), false); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verifyError(msg, "Unsupported relation for switch [nine, one]", IllegalStateException.class); + } + + private void initWithScript(String script, Set relations, boolean routeToAll) throws TbNodeException { + TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration(); + config.setJsScript(script); + config.setAllowedRelations(relations); + config.setRouteToAllWithNoCheck(routeToAll); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + node = new TbJsSwitchNode(); + node.init(nodeConfiguration, null); + } + + private void mockJsExecutor() { + when(ctx.getJsExecutor()).thenReturn(executor); + doAnswer((Answer>>) invocationOnMock -> { + try { + Callable task = (Callable) (invocationOnMock.getArguments())[0]; + return Futures.immediateFuture((Set) task.call()); + } catch (Throwable th) { + return Futures.immediateFailedFuture(th); + } + }).when(executor).executeAsync(Matchers.any(Callable.class)); + } + + private void verifyError(TbMsg msg, String message, Class expectedClass) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals(expectedClass, value.getClass()); + assertEquals(message, value.getMessage()); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java new file mode 100644 index 0000000000..190692c4e5 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java @@ -0,0 +1,124 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.asset.AssetService; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TbChangeOriginatorNodeTest { + + private TbChangeOriginatorNode node; + + @Mock + private TbContext ctx; + @Mock + private AssetService assetService; + + + @Test + public void originatorCanBeChangedToCustomerId() throws TbNodeException { + init(false); + AssetId assetId = new AssetId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); + + node.onMsg(ctx, msg); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals(customerId, actualMsg.getOriginator()); + assertEquals(msg.getId(), actualMsg.getId()); + } + + @Test + public void newChainCanBeStarted() throws TbNodeException { + init(true); + AssetId assetId = new AssetId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); + + node.onMsg(ctx, msg); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).spawn(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals(customerId, actualMsg.getOriginator()); + assertEquals(msg.getId(), actualMsg.getId()); + } + + @Test + public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException { + init(true); + AssetId assetId = new AssetId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong"))); + + node.onMsg(ctx, msg); + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + Throwable value = captor.getValue(); + assertEquals("wrong", value.getMessage()); + } + + public void init(boolean startNewChain) throws TbNodeException { + TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration(); + config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE); + config.setStartNewChain(startNewChain); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + node = new TbChangeOriginatorNode(); + node.init(nodeConfiguration, null); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java new file mode 100644 index 0000000000..d69bad8864 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java @@ -0,0 +1,140 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.concurrent.Callable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class TbTransformMsgNodeTest { + + private TbTransformMsgNode node; + + @Mock + private TbContext ctx; + @Mock + private ListeningExecutor executor; + + @Test + public void metadataCanBeUpdated() throws TbNodeException { + initWithScript("meta.temp = meta.temp * 10;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "7"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals("70.0", actualMsg.getMetaData().getValue("temp")); + } + + @Test + public void metadataCanBeAdded() throws TbNodeException { + initWithScript("meta.newAttr = meta.humidity - msg.passed;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "7"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals("94.0", actualMsg.getMetaData().getValue("newAttr")); + } + + @Test + public void payloadCanBeUpdated() throws TbNodeException { + initWithScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' "); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "7"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + String expectedJson = "{\"name\":\"Vit\",\"passed\":35.0,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}"; + assertEquals(expectedJson, new String(actualMsg.getData())); + } + + private void initWithScript(String script) throws TbNodeException { + TbTransformMsgNodeConfiguration config = new TbTransformMsgNodeConfiguration(); + config.setJsScript(script); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + node = new TbTransformMsgNode(); + node.init(nodeConfiguration, null); + } + + private void mockJsExecutor() { + when(ctx.getJsExecutor()).thenReturn(executor); + doAnswer((Answer>) invocationOnMock -> { + try { + Callable task = (Callable) (invocationOnMock.getArguments())[0]; + return Futures.immediateFuture((TbMsg) task.call()); + } catch (Throwable th) { + return Futures.immediateFailedFuture(th); + } + }).when(executor).executeAsync(Matchers.any(Callable.class)); + } + + private void verifyError(TbMsg msg, String message, Class expectedClass) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals(expectedClass, value.getClass()); + assertEquals(message, value.getMessage()); + } +} \ No newline at end of file