From 4aa55bccf84519fd7923ffd38cb5eed8541dda0c Mon Sep 17 00:00:00 2001 From: Illia Barkov Date: Wed, 6 Jan 2021 14:41:01 +0200 Subject: [PATCH] Added ability to return arrays in transformation script node (#3910) * Added ability to return arrays in transformation script node * fix typo * Refactoring * Improvements * Improvements --- .../script/RuleNodeJsScriptEngine.java | 15 +++++-- .../rule/engine/api/ScriptEngine.java | 3 +- .../MultipleTbMsgsCallbackWrapper.java | 45 +++++++++++++++++++ .../transform/TbAbstractTransformNode.java | 32 +++++++++++-- .../transform/TbChangeOriginatorNode.java | 11 ++--- .../transform/TbMsgCallbackWrapper.java | 23 ++++++++++ .../engine/transform/TbTransformMsgNode.java | 11 +++-- .../transform/TbTransformMsgNodeTest.java | 3 +- 8 files changed, 125 insertions(+), 18 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbMsgCallbackWrapper.java diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 275e645ce2..e49e87339d 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -30,7 +30,9 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import javax.script.ScriptException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -116,14 +118,19 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } @Override - public ListenableFuture executeUpdateAsync(TbMsg msg) { + public ListenableFuture> executeUpdateAsync(TbMsg msg) { ListenableFuture result = executeScriptAsync(msg); return Futures.transformAsync(result, json -> { - if (!json.isObject()) { + if (json.isObject()) { + return Futures.immediateFuture(Collections.singletonList(unbindMsg(json, msg))); + } else if (json.isArray()){ + List res = new ArrayList<>(json.size()); + json.forEach(jsonObject -> res.add(unbindMsg(jsonObject, msg))); + return Futures.immediateFuture(res); + } + else{ log.warn("Wrong result type: {}", json.getNodeType()); return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType())); - } else { - return Futures.immediateFuture(unbindMsg(json, msg)); } }, MoreExecutors.directExecutor()); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java index a85f519337..8b645f6d93 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java @@ -20,13 +20,14 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.msg.TbMsg; import javax.script.ScriptException; +import java.util.List; import java.util.Set; public interface ScriptEngine { TbMsg executeUpdate(TbMsg msg) throws ScriptException; - ListenableFuture executeUpdateAsync(TbMsg msg); + ListenableFuture> executeUpdateAsync(TbMsg msg); TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java new file mode 100644 index 0000000000..c841a5a3cc --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/MultipleTbMsgsCallbackWrapper.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; + +import java.util.concurrent.atomic.AtomicInteger; + +public class MultipleTbMsgsCallbackWrapper implements TbMsgCallbackWrapper { + + private final AtomicInteger tbMsgsCallbackCount; + private final TbMsgCallback callback; + + public MultipleTbMsgsCallbackWrapper(int tbMsgsCallbackCount, TbMsgCallback callback) { + this.tbMsgsCallbackCount = new AtomicInteger(tbMsgsCallbackCount); + this.callback = callback; + } + + @Override + public void onSuccess() { + if (tbMsgsCallbackCount.decrementAndGet() <= 0) { + callback.onSuccess(); + } + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(new RuleEngineException(t.getMessage())); + } +} + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java index 84a6b76920..0b9139a307 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java @@ -17,16 +17,19 @@ package org.thingsboard.rule.engine.transform; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; + +import java.util.List; import static org.thingsboard.common.util.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; -import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; /** * Created by ashvayka on 19.01.18. @@ -61,7 +64,30 @@ public abstract class TbAbstractTransformNode implements TbNode { } } - protected abstract ListenableFuture transform(TbContext ctx, TbMsg msg); + protected void transformSuccess(TbContext ctx, TbMsg msg, List msgs) { + if (msgs != null && !msgs.isEmpty()) { + if (msgs.size() == 1) { + ctx.tellSuccess(msgs.get(0)); + } else { + TbMsgCallbackWrapper wrapper = new MultipleTbMsgsCallbackWrapper(msgs.size(), new TbMsgCallback() { + @Override + public void onSuccess() { + ctx.ack(msg); + } + + @Override + public void onFailure(RuleEngineException e) { + ctx.tellFailure(msg, e); + } + }); + msgs.forEach(newMsg -> ctx.enqueueForTellNext(newMsg, "Success", wrapper::onSuccess, wrapper::onFailure)); + } + } else { + ctx.tellNext(msg, FAILURE); + } + } + + protected abstract ListenableFuture> transform(TbContext ctx, TbMsg msg); public void setConfig(TbTransformNodeConfiguration config) { this.config = config; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java index 5d505f51b2..566b9bef08 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java @@ -15,16 +15,15 @@ */ package org.thingsboard.rule.engine.transform; -import com.google.common.base.Function; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.util.EntitiesAlarmOriginatorIdAsyncLoader; import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader; import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader; @@ -33,7 +32,9 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; +import java.util.Collections; import java.util.HashSet; +import java.util.List; @Slf4j @RuleNode( @@ -65,13 +66,13 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode { } @Override - protected ListenableFuture transform(TbContext ctx, TbMsg msg) { + protected ListenableFuture> transform(TbContext ctx, TbMsg msg) { ListenableFuture newOriginator = getNewOriginator(ctx, msg.getOriginator()); - return Futures.transform(newOriginator, (Function) n -> { + return Futures.transform(newOriginator, n -> { if (n == null || n.isNullUid()) { return null; } - return ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData()); + return Collections.singletonList((ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData()))); }, ctx.getDbCallbackExecutor()); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbMsgCallbackWrapper.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbMsgCallbackWrapper.java new file mode 100644 index 0000000000..d05da7ce5d --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbMsgCallbackWrapper.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +public interface TbMsgCallbackWrapper { + + void onSuccess(); + + void onFailure(Throwable t); +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java index 56f15f42de..94b39af045 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java @@ -16,13 +16,16 @@ package org.thingsboard.rule.engine.transform; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.ScriptEngine; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.rule.engine.api.*; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; -import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; -import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; +import java.util.List; @RuleNode( type = ComponentType.TRANSFORMATION, @@ -51,7 +54,7 @@ public class TbTransformMsgNode extends TbAbstractTransformNode { } @Override - protected ListenableFuture transform(TbContext ctx, TbMsg msg) { + protected ListenableFuture> transform(TbContext ctx, TbMsg msg) { ctx.logJsEvalRequest(); return jsEngine.executeUpdateAsync(msg); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java index bc56815468..e2087fd607 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java @@ -38,6 +38,7 @@ import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import javax.script.ScriptException; +import java.util.Collections; import java.util.concurrent.Callable; import static org.junit.Assert.assertEquals; @@ -72,7 +73,7 @@ public class TbTransformMsgNodeTest { TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON,rawJson, ruleChainId, ruleNodeId); TbMsg transformedMsg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, "{new}", ruleChainId, ruleNodeId); mockJsExecutor(); - when(scriptEngine.executeUpdateAsync(msg)).thenReturn(Futures.immediateFuture(transformedMsg)); + when(scriptEngine.executeUpdateAsync(msg)).thenReturn(Futures.immediateFuture(Collections.singletonList(transformedMsg))); node.onMsg(ctx, msg); verify(ctx).getDbCallbackExecutor();