diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 5163b6c39d..261dda7670 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -30,7 +30,7 @@ import java.util.UUID; * Created by ashvayka on 13.01.18. */ @Data -public final class TbMsg implements Serializable { +public final class TbMsg implements Serializable, Cloneable { private final UUID id; private final String type; @@ -39,6 +39,11 @@ public final class TbMsg implements Serializable { private final byte[] data; + @Override + public TbMsg clone() { + return fromBytes(toBytes(this)); + } + public static ByteBuffer toBytes(TbMsg msg) { MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); builder.setId(msg.getId().toString()); @@ -77,4 +82,5 @@ public final class TbMsg implements Serializable { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); } } + } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index c7e2bec28c..fdcf56aac9 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -29,6 +29,7 @@ import org.thingsboard.server.dao.rule.RuleService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import java.util.Set; import java.util.UUID; /** @@ -40,6 +41,8 @@ public interface TbContext { void tellNext(TbMsg msg, String relationType); + void tellNext(TbMsg msg, Set relationTypes); + void tellSelf(TbMsg msg, long delayMs); void tellOthers(TbMsg msg); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java index 4972bc6452..089c9b1919 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java @@ -41,18 +41,10 @@ public class TbJsFilterNode implements TbNode { public void onMsg(TbContext ctx, TbMsg msg) { ListeningExecutor jsExecutor = ctx.getJsExecutor(); withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))), - result -> processFilter(ctx, msg, result), + filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)), t -> ctx.tellError(msg, t)); } - private void processFilter(TbContext ctx, TbMsg msg, Boolean filterResult) { - if (filterResult) { - ctx.tellNext(msg); - } else { - log.debug("Msg filtered out {}", msg.getId()); - } - } - private Bindings toBindings(TbMsg msg) { return NashornJsEngine.bindMsg(msg); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java index 9dcb3325aa..afd81636a0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java @@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.js.NashornJsEngine; import org.thingsboard.server.common.msg.TbMsg; import javax.script.Bindings; +import java.util.Set; import static org.thingsboard.rule.engine.DonAsynchron.withCallback; @@ -34,30 +35,40 @@ public class TbJsSwitchNode implements TbNode { @Override public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class); - this.jsEngine = new NashornJsEngine(config.getJsScript()); if (config.getAllowedRelations().size() < 1) { String message = "Switch node should have at least 1 relation"; log.error(message); throw new IllegalStateException(message); } + if (!config.isRouteToAllWithNoCheck()) { + this.jsEngine = new NashornJsEngine(config.getJsScript()); + } } @Override public void onMsg(TbContext ctx, TbMsg msg) { + if (config.isRouteToAllWithNoCheck()) { + ctx.tellNext(msg, config.getAllowedRelations()); + return; + } ListeningExecutor jsExecutor = ctx.getJsExecutor(); withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))), result -> processSwitch(ctx, msg, result), t -> ctx.tellError(msg, t)); } - private void processSwitch(TbContext ctx, TbMsg msg, String nextRelation) { - if (config.getAllowedRelations().contains(nextRelation)) { - ctx.tellNext(msg, nextRelation); + private void processSwitch(TbContext ctx, TbMsg msg, Set nextRelations) { + if (validateRelations(nextRelations)) { + ctx.tellNext(msg, nextRelations); } else { - ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelation)); + ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelations)); } } + private boolean validateRelations(Set nextRelations) { + return config.getAllowedRelations().containsAll(nextRelations); + } + private Bindings toBindings(TbMsg msg) { return NashornJsEngine.bindMsg(msg); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java index d0e2a51b2d..331302d542 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java @@ -24,4 +24,5 @@ public class TbJsSwitchNodeConfiguration { private String jsScript; private Set allowedRelations; + private boolean routeToAllWithNoCheck; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java index a40d4ecb0c..082535f0fe 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java @@ -15,15 +15,20 @@ */ package org.thingsboard.rule.engine.js; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; import jdk.nashorn.api.scripting.NashornScriptEngineFactory; +import jdk.nashorn.api.scripting.ScriptObjectMirror; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; import org.thingsboard.server.common.msg.TbMsg; import javax.script.*; +import java.util.Collections; import java.util.Map; +import java.util.Set; @Slf4j @@ -68,6 +73,32 @@ public class NashornJsEngine { } } + private static TbMsg unbindMsg(Bindings bindings, TbMsg msg) throws JsonProcessingException { + for (Map.Entry entry : msg.getMetaData().getData().entrySet()) { + Object obj = entry.getValue(); + entry.setValue(obj.toString()); + } + + Object payload = bindings.get(DATA); + if (payload != null) { + ObjectMapper mapper = new ObjectMapper(); + byte[] bytes = mapper.writeValueAsBytes(payload); + return new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), bytes); + } + + return msg; + } + + public TbMsg executeUpdate(Bindings bindings, TbMsg msg) throws ScriptException { + try { + engine.eval(bindings); + return unbindMsg(bindings, msg); + } catch (Throwable th) { + th.printStackTrace(); + throw new IllegalArgumentException("Cannot unbind js args", th); + } + } + public boolean executeFilter(Bindings bindings) throws ScriptException { Object eval = engine.eval(bindings); if (eval instanceof Boolean) { @@ -78,14 +109,28 @@ public class NashornJsEngine { } } - public String executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException { + public Set executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException { Object eval = this.engine.eval(bindings); if (eval instanceof String) { - return (String) eval; - } else { - log.warn("Wrong result type: {}", eval); - throw new ScriptException("Wrong result type: " + eval); + return Collections.singleton((String) eval); + } else if (eval instanceof ScriptObjectMirror) { + ScriptObjectMirror mir = (ScriptObjectMirror) eval; + if (mir.isArray()) { + Set nextStates = Sets.newHashSet(); + for (Map.Entry entry : mir.entrySet()) { + if (entry.getValue() instanceof String) { + nextStates.add((String) entry.getValue()); + } else { + log.warn("Wrong result type: {}", eval); + throw new ScriptException("Wrong result type: " + eval); + } + } + return nextStates; + } } + + log.warn("Wrong result type: {}", eval); + throw new ScriptException("Wrong result type: " + eval); } public void destroy() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java index 52850be9a6..269e40fc19 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java @@ -70,7 +70,7 @@ public abstract class TbEntityGetAttrNode implements TbNode } - private void putAttributesAndTell(TbContext ctx, TbMsg msg, List attributes) { + private void putAttributesAndTell(TbContext ctx, TbMsg msg, List attributes) { attributes.forEach(r -> { String attrName = config.getAttrMapping().get(r.getKey()); msg.getMetaData().putValue(attrName, r.getValueAsString()); @@ -85,4 +85,8 @@ public abstract class TbEntityGetAttrNode implements TbNode protected abstract ListenableFuture findEntityAsync(TbContext ctx, EntityId originator); + public void setConfig(TbGetEntityAttrNodeConfiguration config) { + this.config = config; + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java index 57f9b79e7b..b7b1fd73a3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java @@ -15,37 +15,17 @@ */ package org.thingsboard.rule.engine.metadata; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.HasCustomerId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode { @Override protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { - - switch (originator.getEntityType()) { - case CUSTOMER: - return Futures.immediateFuture((CustomerId) originator); - case USER: - return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); - case ASSET: - return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); - case DEVICE: - return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); - default: - return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); - } - } - - private ListenableFuture getCustomerAsync(ListenableFuture future) { - return Futures.transform(future, (AsyncFunction) in -> { - return in != null ? Futures.immediateFuture(in.getCustomerId()) - : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));}); + return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, originator); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java index 75b0a6524b..ae0b662d48 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java @@ -19,7 +19,7 @@ import lombok.Data; import org.thingsboard.server.common.data.relation.EntitySearchDirection; @Data -public class TbGetRelatedAttrNodeConfiguration { +public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration { private String relationType; private EntitySearchDirection direction; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java index 5823c181de..474fb5d6c0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java @@ -15,23 +15,14 @@ */ package org.thingsboard.rule.engine.metadata; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.commons.collections.CollectionUtils; import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.relation.EntitySearchDirection; -import org.thingsboard.server.dao.relation.RelationService; - -import java.util.List; - -import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON; public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode { @@ -40,23 +31,11 @@ public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode { @Override public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class); + setConfig(config); } @Override protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { - RelationService relationService = ctx.getRelationService(); - if (config.getDirection() == EntitySearchDirection.FROM) { - ListenableFuture> asyncRelation = relationService.findByFromAndTypeAsync(originator, config.getRelationType(), COMMON); - return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) - : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); - } else if (config.getDirection() == EntitySearchDirection.TO) { - ListenableFuture> asyncRelation = relationService.findByToAndTypeAsync(originator, config.getRelationType(), COMMON); - return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) - : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); - } - - return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); + return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, originator, config.getDirection(), config.getRelationType()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java index 2cf9a97d20..b97e220c04 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java @@ -15,50 +15,19 @@ */ package org.thingsboard.rule.engine.metadata; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.HasTenantId; -import org.thingsboard.server.common.data.alarm.AlarmId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; @Slf4j public class TbGetTenantAttributeNode extends TbEntityGetAttrNode { @Override protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { - - switch (originator.getEntityType()) { - case TENANT: - return Futures.immediateFuture((TenantId) originator); - case CUSTOMER: - return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator)); - case USER: - return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); - case RULE: - return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator)); - case PLUGIN: - return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator)); - case ASSET: - return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); - case DEVICE: - return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); - case ALARM: - return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator)); - case RULE_CHAIN: - return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator)); - default: - return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); - } - } - - private ListenableFuture getTenantAsync(ListenableFuture future) { - return Futures.transform(future, (AsyncFunction) in -> { - return in != null ? Futures.immediateFuture(in.getTenantId()) - : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));}); + return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, originator); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java new file mode 100644 index 0000000000..426a3276a9 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.server.common.msg.TbMsg; + +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; + +/** + * Created by ashvayka on 19.01.18. + */ +@Slf4j +public abstract class TbAbstractTransformNode implements TbNode { + + private TbTransformNodeConfiguration config; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbTransformNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + withCallback(transform(ctx, msg), + m -> routeMsg(ctx, m), + t -> ctx.tellError(msg, t)); + } + + protected abstract ListenableFuture transform(TbContext ctx, TbMsg msg); + + private void routeMsg(TbContext ctx, TbMsg msg) { + if (config.isStartNewChain()) { + ctx.spawn(msg); + } else { + ctx.tellNext(msg); + } + } + + public void setConfig(TbTransformNodeConfiguration config) { + this.config = config; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java new file mode 100644 index 0000000000..2592af253e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.google.common.base.Function; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader; +import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader; +import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.HashSet; + +@Slf4j +public class TbChangeOriginatorNode extends TbAbstractTransformNode { + + protected static final String CUSTOMER_SOURCE = "CUSTOMER"; + protected static final String TENANT_SOURCE = "TENANT"; + protected static final String RELATED_SOURCE = "RELATED"; + + private TbChangeOriginatorNodeConfiguration config; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbChangeOriginatorNodeConfiguration.class); + validateConfig(config); + setConfig(config); + } + + @Override + protected ListenableFuture transform(TbContext ctx, TbMsg msg) { + ListenableFuture newOriginator = getNewOriginator(ctx, msg.getOriginator()); + return Futures.transform(newOriginator, (Function) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData())); + } + + private ListenableFuture getNewOriginator(TbContext ctx, EntityId original) { + switch (config.getOriginatorSource()) { + case CUSTOMER_SOURCE: + return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, original); + case TENANT_SOURCE: + return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, original); + case RELATED_SOURCE: + return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, original, config.getDirection(), config.getRelationType()); + default: + return Futures.immediateFailedFuture(new IllegalStateException("Unexpected originator source " + config.getOriginatorSource())); + } + } + + private void validateConfig(TbChangeOriginatorNodeConfiguration conf) { + HashSet knownSources = Sets.newHashSet(CUSTOMER_SOURCE, TENANT_SOURCE, RELATED_SOURCE); + if (!knownSources.contains(conf.getOriginatorSource())) { + log.error("Unsupported source [{}] for TbChangeOriginatorNode", conf.getOriginatorSource()); + throw new IllegalArgumentException("Unsupported source TbChangeOriginatorNode" + conf.getOriginatorSource()); + } + + if (conf.getOriginatorSource().equals(RELATED_SOURCE)) { + if (conf.getDirection() == null || StringUtils.isBlank(conf.getRelationType())) { + log.error("Related source for TbChangeOriginatorNode should have direction and relationType. Actual [{}] [{}]", + conf.getDirection(), conf.getRelationType()); + throw new IllegalArgumentException("Wrong config for RElated Source in TbChangeOriginatorNode" + conf.getOriginatorSource()); + } + } + + } + + @Override + public void destroy() { + + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java new file mode 100644 index 0000000000..cf036810c4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import lombok.Data; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; + +@Data +public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{ + + private String originatorSource; + private EntitySearchDirection direction; + private String relationType; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java new file mode 100644 index 0000000000..241fbfbf7e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.rule.engine.js.NashornJsEngine; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.script.Bindings; + +public class TbTransformMsgNode extends TbAbstractTransformNode { + + private TbTransformMsgNodeConfiguration config; + private NashornJsEngine jsEngine; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbTransformMsgNodeConfiguration.class); + this.jsEngine = new NashornJsEngine(config.getJsScript()); + setConfig(config); + } + + @Override + protected ListenableFuture transform(TbContext ctx, TbMsg msg) { + return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(toBindings(msg), msg)); + } + + private Bindings toBindings(TbMsg msg) { + return NashornJsEngine.bindMsg(msg); + } + + @Override + public void destroy() { + if (jsEngine != null) { + jsEngine.destroy(); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java new file mode 100644 index 0000000000..9cc926b54d --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import lombok.Data; + +@Data +public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration { + + private String jsScript; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java deleted file mode 100644 index 9edd14c953..0000000000 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.transform; - -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.TbNodeUtils; -import org.thingsboard.rule.engine.api.*; -import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; -import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.dao.attributes.AttributesService; - -import java.util.List; - -/** - * Created by ashvayka on 19.01.18. - */ -@Slf4j -public class TbTransformNode implements TbNode { - - TbGetAttributesNodeConfiguration config; - - @Override - public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { - this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class); - } - - @Override - public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException { - try { - //TODO: refactor this to work async and fetch attributes from cache. - AttributesService service = ctx.getAttributesService(); - fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs."); - fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss."); - fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared."); - ctx.tellNext(msg); - } catch (Exception e) { - log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e); - throw new TbNodeException(e); - } - } - - private void fetchAttributes(TbMsg msg, AttributesService service, List attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException { - if (attributeNames != null && attributeNames.isEmpty()) { - List attributes = service.find(msg.getOriginator(), scope, attributeNames).get(); - attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString())); - } - } - - @Override - public void destroy() { - - } -} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java new file mode 100644 index 0000000000..d9f57806ae --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import lombok.Data; + +@Data +public class TbTransformNodeConfiguration { + + private boolean startNewChain = false; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java new file mode 100644 index 0000000000..67eb808795 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.HasCustomerId; +import org.thingsboard.server.common.data.id.*; + +public class EntitiesCustomerIdAsyncLoader { + + + public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { + + switch (original.getEntityType()) { + case CUSTOMER: + return Futures.immediateFuture((CustomerId) original); + case USER: + return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) original)); + case ASSET: + return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original)); + case DEVICE: + return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original)); + default: + return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original)); + } + } + + private static ListenableFuture getCustomerAsync(ListenableFuture future) { + return Futures.transform(future, (AsyncFunction) in -> { + return in != null ? Futures.immediateFuture(in.getCustomerId()) + : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));}); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java new file mode 100644 index 0000000000..ac69c5dd59 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.commons.collections.CollectionUtils; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; +import org.thingsboard.server.dao.relation.RelationService; + +import java.util.List; + +import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON; + +public class EntitiesRelatedEntityIdAsyncLoader { + + public static ListenableFuture findEntityAsync(TbContext ctx, EntityId originator, + EntitySearchDirection direction, String relationType) { + RelationService relationService = ctx.getRelationService(); + if (direction == EntitySearchDirection.FROM) { + ListenableFuture> asyncRelation = relationService.findByFromAndTypeAsync(originator, relationType, COMMON); + return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); + } else if (direction == EntitySearchDirection.TO) { + ListenableFuture> asyncRelation = relationService.findByToAndTypeAsync(originator, relationType, COMMON); + return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); + } + + return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java new file mode 100644 index 0000000000..388881b84e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.HasTenantId; +import org.thingsboard.server.common.data.alarm.AlarmId; +import org.thingsboard.server.common.data.id.*; + +public class EntitiesTenantIdAsyncLoader { + + public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { + + switch (original.getEntityType()) { + case TENANT: + return Futures.immediateFuture((TenantId) original); + case CUSTOMER: + return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) original)); + case USER: + return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) original)); + case RULE: + return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) original)); + case PLUGIN: + return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) original)); + case ASSET: + return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original)); + case DEVICE: + return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original)); + case ALARM: + return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) original)); + case RULE_CHAIN: + return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) original)); + default: + return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original)); + } + } + + private static ListenableFuture getTenantAsync(ListenableFuture future) { + return Futures.transform(future, (AsyncFunction) in -> { + return in != null ? Futures.immediateFuture(in.getTenantId()) + : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));}); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java index 6a9146de98..2fd9f4e8ee 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java @@ -58,6 +58,7 @@ public class TbJsFilterNodeTest { node.onMsg(ctx, msg); verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, "false"); verifyNoMoreInteractions(ctx); } @@ -102,6 +103,7 @@ public class TbJsFilterNodeTest { node.onMsg(ctx, msg); verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, "false"); verifyNoMoreInteractions(ctx); } @@ -116,7 +118,7 @@ public class TbJsFilterNodeTest { node.onMsg(ctx, msg); verify(ctx).getJsExecutor(); - verify(ctx).tellNext(msg); + verify(ctx).tellNext(msg, "true"); } @Test @@ -132,7 +134,7 @@ public class TbJsFilterNodeTest { node.onMsg(ctx, msg); verify(ctx).getJsExecutor(); - verify(ctx).tellNext(msg); + verify(ctx).tellNext(msg, "true"); } private void initWithScript(String script) throws TbNodeException { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java index 1b4cce9fc9..a2f5f7d6f2 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java @@ -34,14 +34,13 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) public class TbJsSwitchNodeTest { @@ -53,6 +52,41 @@ public class TbJsSwitchNodeTest { @Mock private ListeningExecutor executor; + @Test + public void routeToAllDoNotEvaluatesJs() throws TbNodeException { + HashSet relations = Sets.newHashSet("one", "two"); + initWithScript("test qwerty", relations, true); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes()); + + node.onMsg(ctx, msg); + verify(ctx).tellNext(msg, relations); + verifyNoMoreInteractions(ctx, executor); + } + + @Test + public void multipleRoutesAreAllowed() throws TbNodeException { + String jsCode = "function nextRelation(meta, msg) {\n" + + " if(msg.passed == 5 && meta.temp == 10)\n" + + " return ['three', 'one']\n" + + " else\n" + + " return 'two';\n" + + "};\n" + + "\n" + + "nextRelation(meta, msg);"; + initWithScript(jsCode, Sets.newHashSet("one", "two", "three"), false); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "10"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + verify(ctx).tellNext(msg, Sets.newHashSet("one", "three")); + } + @Test public void allowedRelationPassed() throws TbNodeException { String jsCode = "function nextRelation(meta, msg) {\n" + @@ -63,7 +97,7 @@ public class TbJsSwitchNodeTest { "};\n" + "\n" + "nextRelation(meta, msg);"; - initWithScript(jsCode, Sets.newHashSet("one", "two")); + initWithScript(jsCode, Sets.newHashSet("one", "two"), false); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "10"); metaData.putValue("humidity", "99"); @@ -74,17 +108,17 @@ public class TbJsSwitchNodeTest { node.onMsg(ctx, msg); verify(ctx).getJsExecutor(); - verify(ctx).tellNext(msg, "one"); + verify(ctx).tellNext(msg, Sets.newHashSet("one")); } @Test public void unknownRelationThrowsException() throws TbNodeException { String jsCode = "function nextRelation(meta, msg) {\n" + - " return 'nine';" + + " return ['one','nine'];" + "};\n" + "\n" + "nextRelation(meta, msg);"; - initWithScript(jsCode, Sets.newHashSet("one", "two")); + initWithScript(jsCode, Sets.newHashSet("one", "two"), false); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "10"); metaData.putValue("humidity", "99"); @@ -95,13 +129,14 @@ public class TbJsSwitchNodeTest { node.onMsg(ctx, msg); verify(ctx).getJsExecutor(); - verifyError(msg, "Unsupported relation for switch nine", IllegalStateException.class); + verifyError(msg, "Unsupported relation for switch [nine, one]", IllegalStateException.class); } - private void initWithScript(String script, Set relations) throws TbNodeException { + private void initWithScript(String script, Set relations, boolean routeToAll) throws TbNodeException { TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration(); config.setJsScript(script); config.setAllowedRelations(relations); + config.setRouteToAllWithNoCheck(routeToAll); ObjectMapper mapper = new ObjectMapper(); TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); nodeConfiguration.setData(mapper.valueToTree(config)); @@ -112,10 +147,10 @@ public class TbJsSwitchNodeTest { private void mockJsExecutor() { when(ctx.getJsExecutor()).thenReturn(executor); - doAnswer((Answer>) invocationOnMock -> { + doAnswer((Answer>>) invocationOnMock -> { try { Callable task = (Callable) (invocationOnMock.getArguments())[0]; - return Futures.immediateFuture((String) task.call()); + return Futures.immediateFuture((Set) task.call()); } catch (Throwable th) { return Futures.immediateFailedFuture(th); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java new file mode 100644 index 0000000000..77b00fb778 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java @@ -0,0 +1,125 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.asset.AssetService; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TbChangeOriginatorNodeTest { + + private TbChangeOriginatorNode node; + + @Mock + private TbContext ctx; + @Mock + private AssetService assetService; + + + @Test + public void originatorCanBeChangedToCustomerId() throws TbNodeException { + init(false); + AssetId assetId = new AssetId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); + + node.onMsg(ctx, msg); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals(customerId, actualMsg.getOriginator()); + assertEquals(msg.getId(), actualMsg.getId()); + } + + @Test + public void newChainCanBeStarted() throws TbNodeException { + init(true); + AssetId assetId = new AssetId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); + + node.onMsg(ctx, msg); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).spawn(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals(customerId, actualMsg.getOriginator()); + assertEquals(msg.getId(), actualMsg.getId()); + } + + @Test + public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException { + init(true); + AssetId assetId = new AssetId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong"))); + + node.onMsg(ctx, msg); + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + Throwable value = captor.getValue(); + assertEquals("wrong", value.getMessage()); + } + + public void init(boolean startNewChain) throws TbNodeException { + TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration(); + config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE); + config.setStartNewChain(startNewChain); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); + nodeConfiguration.setData(mapper.valueToTree(config)); + + node = new TbChangeOriginatorNode(); + node.init(nodeConfiguration, null); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java new file mode 100644 index 0000000000..876e70f9b8 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java @@ -0,0 +1,141 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transform; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.concurrent.Callable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class TbTransformMsgNodeTest { + + private TbTransformMsgNode node; + + @Mock + private TbContext ctx; + @Mock + private ListeningExecutor executor; + + @Test + public void metadataCanBeUpdated() throws TbNodeException { + initWithScript("meta.temp = meta.temp * 10;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "7"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals("70.0", actualMsg.getMetaData().getValue("temp")); + } + + @Test + public void metadataCanBeAdded() throws TbNodeException { + initWithScript("meta.newAttr = meta.humidity - msg.passed;"); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "7"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + assertEquals("94.0", actualMsg.getMetaData().getValue("newAttr")); + } + + @Test + public void payloadCanBeUpdated() throws TbNodeException { + initWithScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' "); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temp", "7"); + metaData.putValue("humidity", "99"); + String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}"; + + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes()); + mockJsExecutor(); + + node.onMsg(ctx, msg); + verify(ctx).getJsExecutor(); + ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx).tellNext(captor.capture()); + TbMsg actualMsg = captor.getValue(); + String expectedJson = "{\"name\":\"Vit\",\"passed\":35.0,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}"; + assertEquals(expectedJson, new String(actualMsg.getData())); + } + + private void initWithScript(String script) throws TbNodeException { + TbTransformMsgNodeConfiguration config = new TbTransformMsgNodeConfiguration(); + config.setJsScript(script); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); + nodeConfiguration.setData(mapper.valueToTree(config)); + + node = new TbTransformMsgNode(); + node.init(nodeConfiguration, null); + } + + private void mockJsExecutor() { + when(ctx.getJsExecutor()).thenReturn(executor); + doAnswer((Answer>) invocationOnMock -> { + try { + Callable task = (Callable) (invocationOnMock.getArguments())[0]; + return Futures.immediateFuture((TbMsg) task.call()); + } catch (Throwable th) { + return Futures.immediateFailedFuture(th); + } + }).when(executor).executeAsync(Matchers.any(Callable.class)); + } + + private void verifyError(TbMsg msg, String message, Class expectedClass) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals(expectedClass, value.getClass()); + assertEquals(message, value.getMessage()); + } +} \ No newline at end of file