implement Transformation Nodes

This commit is contained in:
vparomskiy 2018-03-20 15:10:09 +02:00
parent 09c690634b
commit 1968b5d842
25 changed files with 856 additions and 187 deletions

View File

@ -30,7 +30,7 @@ import java.util.UUID;
* Created by ashvayka on 13.01.18. * Created by ashvayka on 13.01.18.
*/ */
@Data @Data
public final class TbMsg implements Serializable { public final class TbMsg implements Serializable, Cloneable {
private final UUID id; private final UUID id;
private final String type; private final String type;
@ -39,6 +39,11 @@ public final class TbMsg implements Serializable {
private final byte[] data; private final byte[] data;
@Override
public TbMsg clone() {
return fromBytes(toBytes(this));
}
public static ByteBuffer toBytes(TbMsg msg) { public static ByteBuffer toBytes(TbMsg msg) {
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
builder.setId(msg.getId().toString()); builder.setId(msg.getId().toString());
@ -77,4 +82,5 @@ public final class TbMsg implements Serializable {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e); throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
} }
} }
} }

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
/** /**
@ -40,6 +41,8 @@ public interface TbContext {
void tellNext(TbMsg msg, String relationType); void tellNext(TbMsg msg, String relationType);
void tellNext(TbMsg msg, Set<String> relationTypes);
void tellSelf(TbMsg msg, long delayMs); void tellSelf(TbMsg msg, long delayMs);
void tellOthers(TbMsg msg); void tellOthers(TbMsg msg);

View File

@ -41,18 +41,10 @@ public class TbJsFilterNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
ListeningExecutor jsExecutor = ctx.getJsExecutor(); ListeningExecutor jsExecutor = ctx.getJsExecutor();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))), withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))),
result -> processFilter(ctx, msg, result), filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)),
t -> ctx.tellError(msg, t)); t -> ctx.tellError(msg, t));
} }
private void processFilter(TbContext ctx, TbMsg msg, Boolean filterResult) {
if (filterResult) {
ctx.tellNext(msg);
} else {
log.debug("Msg filtered out {}", msg.getId());
}
}
private Bindings toBindings(TbMsg msg) { private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg); return NashornJsEngine.bindMsg(msg);
} }

View File

@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.js.NashornJsEngine;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import javax.script.Bindings; import javax.script.Bindings;
import java.util.Set;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@ -34,30 +35,40 @@ public class TbJsSwitchNode implements TbNode {
@Override @Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class);
this.jsEngine = new NashornJsEngine(config.getJsScript());
if (config.getAllowedRelations().size() < 1) { if (config.getAllowedRelations().size() < 1) {
String message = "Switch node should have at least 1 relation"; String message = "Switch node should have at least 1 relation";
log.error(message); log.error(message);
throw new IllegalStateException(message); throw new IllegalStateException(message);
} }
if (!config.isRouteToAllWithNoCheck()) {
this.jsEngine = new NashornJsEngine(config.getJsScript());
}
} }
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
if (config.isRouteToAllWithNoCheck()) {
ctx.tellNext(msg, config.getAllowedRelations());
return;
}
ListeningExecutor jsExecutor = ctx.getJsExecutor(); ListeningExecutor jsExecutor = ctx.getJsExecutor();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))), withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))),
result -> processSwitch(ctx, msg, result), result -> processSwitch(ctx, msg, result),
t -> ctx.tellError(msg, t)); t -> ctx.tellError(msg, t));
} }
private void processSwitch(TbContext ctx, TbMsg msg, String nextRelation) { private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) {
if (config.getAllowedRelations().contains(nextRelation)) { if (validateRelations(nextRelations)) {
ctx.tellNext(msg, nextRelation); ctx.tellNext(msg, nextRelations);
} else { } else {
ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelation)); ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelations));
} }
} }
private boolean validateRelations(Set<String> nextRelations) {
return config.getAllowedRelations().containsAll(nextRelations);
}
private Bindings toBindings(TbMsg msg) { private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg); return NashornJsEngine.bindMsg(msg);
} }

View File

@ -24,4 +24,5 @@ public class TbJsSwitchNodeConfiguration {
private String jsScript; private String jsScript;
private Set<String> allowedRelations; private Set<String> allowedRelations;
private boolean routeToAllWithNoCheck;
} }

View File

@ -15,15 +15,20 @@
*/ */
package org.thingsboard.rule.engine.js; package org.thingsboard.rule.engine.js;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory; import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import javax.script.*; import javax.script.*;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set;
@Slf4j @Slf4j
@ -68,6 +73,32 @@ public class NashornJsEngine {
} }
} }
private static TbMsg unbindMsg(Bindings bindings, TbMsg msg) throws JsonProcessingException {
for (Map.Entry<String, String> entry : msg.getMetaData().getData().entrySet()) {
Object obj = entry.getValue();
entry.setValue(obj.toString());
}
Object payload = bindings.get(DATA);
if (payload != null) {
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(payload);
return new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), bytes);
}
return msg;
}
public TbMsg executeUpdate(Bindings bindings, TbMsg msg) throws ScriptException {
try {
engine.eval(bindings);
return unbindMsg(bindings, msg);
} catch (Throwable th) {
th.printStackTrace();
throw new IllegalArgumentException("Cannot unbind js args", th);
}
}
public boolean executeFilter(Bindings bindings) throws ScriptException { public boolean executeFilter(Bindings bindings) throws ScriptException {
Object eval = engine.eval(bindings); Object eval = engine.eval(bindings);
if (eval instanceof Boolean) { if (eval instanceof Boolean) {
@ -78,14 +109,28 @@ public class NashornJsEngine {
} }
} }
public String executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException { public Set<String> executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException {
Object eval = this.engine.eval(bindings); Object eval = this.engine.eval(bindings);
if (eval instanceof String) { if (eval instanceof String) {
return (String) eval; return Collections.singleton((String) eval);
} else { } else if (eval instanceof ScriptObjectMirror) {
log.warn("Wrong result type: {}", eval); ScriptObjectMirror mir = (ScriptObjectMirror) eval;
throw new ScriptException("Wrong result type: " + eval); if (mir.isArray()) {
Set<String> nextStates = Sets.newHashSet();
for (Map.Entry<String, Object> entry : mir.entrySet()) {
if (entry.getValue() instanceof String) {
nextStates.add((String) entry.getValue());
} else {
log.warn("Wrong result type: {}", eval);
throw new ScriptException("Wrong result type: " + eval);
}
}
return nextStates;
}
} }
log.warn("Wrong result type: {}", eval);
throw new ScriptException("Wrong result type: " + eval);
} }
public void destroy() { public void destroy() {

View File

@ -70,7 +70,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
} }
private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<KvEntry> attributes) { private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<? extends KvEntry> attributes) {
attributes.forEach(r -> { attributes.forEach(r -> {
String attrName = config.getAttrMapping().get(r.getKey()); String attrName = config.getAttrMapping().get(r.getKey());
msg.getMetaData().putValue(attrName, r.getValueAsString()); msg.getMetaData().putValue(attrName, r.getValueAsString());
@ -85,4 +85,8 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
protected abstract ListenableFuture<T> findEntityAsync(TbContext ctx, EntityId originator); protected abstract ListenableFuture<T> findEntityAsync(TbContext ctx, EntityId originator);
public void setConfig(TbGetEntityAttrNodeConfiguration config) {
this.config = config;
}
} }

View File

@ -15,37 +15,17 @@
*/ */
package org.thingsboard.rule.engine.metadata; package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
import org.thingsboard.server.common.data.HasCustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.*; import org.thingsboard.server.common.data.id.EntityId;
public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> { public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> {
@Override @Override
protected ListenableFuture<CustomerId> findEntityAsync(TbContext ctx, EntityId originator) { protected ListenableFuture<CustomerId> findEntityAsync(TbContext ctx, EntityId originator) {
return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, originator);
switch (originator.getEntityType()) {
case CUSTOMER:
return Futures.immediateFuture((CustomerId) originator);
case USER:
return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
case ASSET:
return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
case DEVICE:
return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
default:
return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
}
}
private <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
return in != null ? Futures.immediateFuture(in.getCustomerId())
: Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
} }
} }

View File

@ -19,7 +19,7 @@ import lombok.Data;
import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@Data @Data
public class TbGetRelatedAttrNodeConfiguration { public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration {
private String relationType; private String relationType;
private EntitySearchDirection direction; private EntitySearchDirection direction;

View File

@ -15,23 +15,14 @@
*/ */
package org.thingsboard.rule.engine.metadata; package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbNodeState; import org.thingsboard.rule.engine.api.TbNodeState;
import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.dao.relation.RelationService;
import java.util.List;
import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON;
public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> { public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
@ -40,23 +31,11 @@ public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
@Override @Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class);
setConfig(config);
} }
@Override @Override
protected ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator) { protected ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator) {
RelationService relationService = ctx.getRelationService(); return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, originator, config.getDirection(), config.getRelationType());
if (config.getDirection() == EntitySearchDirection.FROM) {
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByFromAndTypeAsync(originator, config.getRelationType(), COMMON);
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
} else if (config.getDirection() == EntitySearchDirection.TO) {
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByToAndTypeAsync(originator, config.getRelationType(), COMMON);
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
}
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
} }
} }

View File

@ -15,50 +15,19 @@
*/ */
package org.thingsboard.rule.engine.metadata; package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.alarm.AlarmId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.*;
@Slf4j @Slf4j
public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> { public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
@Override @Override
protected ListenableFuture<TenantId> findEntityAsync(TbContext ctx, EntityId originator) { protected ListenableFuture<TenantId> findEntityAsync(TbContext ctx, EntityId originator) {
return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, originator);
switch (originator.getEntityType()) {
case TENANT:
return Futures.immediateFuture((TenantId) originator);
case CUSTOMER:
return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator));
case USER:
return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
case RULE:
return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator));
case PLUGIN:
return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator));
case ASSET:
return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
case DEVICE:
return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
case ALARM:
return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator));
case RULE_CHAIN:
return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator));
default:
return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
}
}
private <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
return in != null ? Futures.immediateFuture(in.getTenantId())
: Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
} }
} }

View File

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
/**
* Created by ashvayka on 19.01.18.
*/
@Slf4j
public abstract class TbAbstractTransformNode implements TbNode {
private TbTransformNodeConfiguration config;
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbTransformNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(transform(ctx, msg),
m -> routeMsg(ctx, m),
t -> ctx.tellError(msg, t));
}
protected abstract ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg);
private void routeMsg(TbContext ctx, TbMsg msg) {
if (config.isStartNewChain()) {
ctx.spawn(msg);
} else {
ctx.tellNext(msg);
}
}
public void setConfig(TbTransformNodeConfiguration config) {
this.config = config;
}
}

View File

@ -0,0 +1,93 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbNodeState;
import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.HashSet;
@Slf4j
public class TbChangeOriginatorNode extends TbAbstractTransformNode {
protected static final String CUSTOMER_SOURCE = "CUSTOMER";
protected static final String TENANT_SOURCE = "TENANT";
protected static final String RELATED_SOURCE = "RELATED";
private TbChangeOriginatorNodeConfiguration config;
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbChangeOriginatorNodeConfiguration.class);
validateConfig(config);
setConfig(config);
}
@Override
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData()));
}
private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {
switch (config.getOriginatorSource()) {
case CUSTOMER_SOURCE:
return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, original);
case TENANT_SOURCE:
return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, original);
case RELATED_SOURCE:
return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, original, config.getDirection(), config.getRelationType());
default:
return Futures.immediateFailedFuture(new IllegalStateException("Unexpected originator source " + config.getOriginatorSource()));
}
}
private void validateConfig(TbChangeOriginatorNodeConfiguration conf) {
HashSet<String> knownSources = Sets.newHashSet(CUSTOMER_SOURCE, TENANT_SOURCE, RELATED_SOURCE);
if (!knownSources.contains(conf.getOriginatorSource())) {
log.error("Unsupported source [{}] for TbChangeOriginatorNode", conf.getOriginatorSource());
throw new IllegalArgumentException("Unsupported source TbChangeOriginatorNode" + conf.getOriginatorSource());
}
if (conf.getOriginatorSource().equals(RELATED_SOURCE)) {
if (conf.getDirection() == null || StringUtils.isBlank(conf.getRelationType())) {
log.error("Related source for TbChangeOriginatorNode should have direction and relationType. Actual [{}] [{}]",
conf.getDirection(), conf.getRelationType());
throw new IllegalArgumentException("Wrong config for RElated Source in TbChangeOriginatorNode" + conf.getOriginatorSource());
}
}
}
@Override
public void destroy() {
}
}

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import lombok.Data;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@Data
public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{
private String originatorSource;
private EntitySearchDirection direction;
private String relationType;
}

View File

@ -0,0 +1,56 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbNodeState;
import org.thingsboard.rule.engine.js.NashornJsEngine;
import org.thingsboard.server.common.msg.TbMsg;
import javax.script.Bindings;
public class TbTransformMsgNode extends TbAbstractTransformNode {
private TbTransformMsgNodeConfiguration config;
private NashornJsEngine jsEngine;
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbTransformMsgNodeConfiguration.class);
this.jsEngine = new NashornJsEngine(config.getJsScript());
setConfig(config);
}
@Override
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(toBindings(msg), msg));
}
private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg);
}
@Override
public void destroy() {
if (jsEngine != null) {
jsEngine.destroy();
}
}
}

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import lombok.Data;
@Data
public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration {
private String jsScript;
}

View File

@ -1,68 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.attributes.AttributesService;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Slf4j
public class TbTransformNode implements TbNode {
TbGetAttributesNodeConfiguration config;
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
try {
//TODO: refactor this to work async and fetch attributes from cache.
AttributesService service = ctx.getAttributesService();
fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
ctx.tellNext(msg);
} catch (Exception e) {
log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e);
throw new TbNodeException(e);
}
}
private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException {
if (attributeNames != null && attributeNames.isEmpty()) {
List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get();
attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString()));
}
}
@Override
public void destroy() {
}
}

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import lombok.Data;
@Data
public class TbTransformNodeConfiguration {
private boolean startNewChain = false;
}

View File

@ -0,0 +1,50 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.id.*;
public class EntitiesCustomerIdAsyncLoader {
public static ListenableFuture<CustomerId> findEntityIdAsync(TbContext ctx, EntityId original) {
switch (original.getEntityType()) {
case CUSTOMER:
return Futures.immediateFuture((CustomerId) original);
case USER:
return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) original));
case ASSET:
return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original));
case DEVICE:
return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original));
default:
return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original));
}
}
private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
return in != null ? Futures.immediateFuture(in.getCustomerId())
: Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
}
}

View File

@ -0,0 +1,51 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.dao.relation.RelationService;
import java.util.List;
import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON;
public class EntitiesRelatedEntityIdAsyncLoader {
public static ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator,
EntitySearchDirection direction, String relationType) {
RelationService relationService = ctx.getRelationService();
if (direction == EntitySearchDirection.FROM) {
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByFromAndTypeAsync(originator, relationType, COMMON);
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
} else if (direction == EntitySearchDirection.TO) {
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByToAndTypeAsync(originator, relationType, COMMON);
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
}
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
}
}

View File

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.HasTenantId;
import org.thingsboard.server.common.data.alarm.AlarmId;
import org.thingsboard.server.common.data.id.*;
public class EntitiesTenantIdAsyncLoader {
public static ListenableFuture<TenantId> findEntityIdAsync(TbContext ctx, EntityId original) {
switch (original.getEntityType()) {
case TENANT:
return Futures.immediateFuture((TenantId) original);
case CUSTOMER:
return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) original));
case USER:
return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) original));
case RULE:
return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) original));
case PLUGIN:
return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) original));
case ASSET:
return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original));
case DEVICE:
return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original));
case ALARM:
return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) original));
case RULE_CHAIN:
return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) original));
default:
return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original));
}
}
private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
return in != null ? Futures.immediateFuture(in.getTenantId())
: Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
}
}

View File

@ -58,6 +58,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
verify(ctx).getJsExecutor(); verify(ctx).getJsExecutor();
verify(ctx).tellNext(msg, "false");
verifyNoMoreInteractions(ctx); verifyNoMoreInteractions(ctx);
} }
@ -102,6 +103,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
verify(ctx).getJsExecutor(); verify(ctx).getJsExecutor();
verify(ctx).tellNext(msg, "false");
verifyNoMoreInteractions(ctx); verifyNoMoreInteractions(ctx);
} }
@ -116,7 +118,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
verify(ctx).getJsExecutor(); verify(ctx).getJsExecutor();
verify(ctx).tellNext(msg); verify(ctx).tellNext(msg, "true");
} }
@Test @Test
@ -132,7 +134,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
verify(ctx).getJsExecutor(); verify(ctx).getJsExecutor();
verify(ctx).tellNext(msg); verify(ctx).tellNext(msg, "true");
} }
private void initWithScript(String script) throws TbNodeException { private void initWithScript(String script) throws TbNodeException {

View File

@ -34,14 +34,13 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class TbJsSwitchNodeTest { public class TbJsSwitchNodeTest {
@ -53,6 +52,41 @@ public class TbJsSwitchNodeTest {
@Mock @Mock
private ListeningExecutor executor; private ListeningExecutor executor;
@Test
public void routeToAllDoNotEvaluatesJs() throws TbNodeException {
HashSet<String> relations = Sets.newHashSet("one", "two");
initWithScript("test qwerty", relations, true);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes());
node.onMsg(ctx, msg);
verify(ctx).tellNext(msg, relations);
verifyNoMoreInteractions(ctx, executor);
}
@Test
public void multipleRoutesAreAllowed() throws TbNodeException {
String jsCode = "function nextRelation(meta, msg) {\n" +
" if(msg.passed == 5 && meta.temp == 10)\n" +
" return ['three', 'one']\n" +
" else\n" +
" return 'two';\n" +
"};\n" +
"\n" +
"nextRelation(meta, msg);";
initWithScript(jsCode, Sets.newHashSet("one", "two", "three"), false);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
mockJsExecutor();
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
verify(ctx).tellNext(msg, Sets.newHashSet("one", "three"));
}
@Test @Test
public void allowedRelationPassed() throws TbNodeException { public void allowedRelationPassed() throws TbNodeException {
String jsCode = "function nextRelation(meta, msg) {\n" + String jsCode = "function nextRelation(meta, msg) {\n" +
@ -63,7 +97,7 @@ public class TbJsSwitchNodeTest {
"};\n" + "};\n" +
"\n" + "\n" +
"nextRelation(meta, msg);"; "nextRelation(meta, msg);";
initWithScript(jsCode, Sets.newHashSet("one", "two")); initWithScript(jsCode, Sets.newHashSet("one", "two"), false);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10"); metaData.putValue("temp", "10");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -74,17 +108,17 @@ public class TbJsSwitchNodeTest {
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
verify(ctx).getJsExecutor(); verify(ctx).getJsExecutor();
verify(ctx).tellNext(msg, "one"); verify(ctx).tellNext(msg, Sets.newHashSet("one"));
} }
@Test @Test
public void unknownRelationThrowsException() throws TbNodeException { public void unknownRelationThrowsException() throws TbNodeException {
String jsCode = "function nextRelation(meta, msg) {\n" + String jsCode = "function nextRelation(meta, msg) {\n" +
" return 'nine';" + " return ['one','nine'];" +
"};\n" + "};\n" +
"\n" + "\n" +
"nextRelation(meta, msg);"; "nextRelation(meta, msg);";
initWithScript(jsCode, Sets.newHashSet("one", "two")); initWithScript(jsCode, Sets.newHashSet("one", "two"), false);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10"); metaData.putValue("temp", "10");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -95,13 +129,14 @@ public class TbJsSwitchNodeTest {
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
verify(ctx).getJsExecutor(); verify(ctx).getJsExecutor();
verifyError(msg, "Unsupported relation for switch nine", IllegalStateException.class); verifyError(msg, "Unsupported relation for switch [nine, one]", IllegalStateException.class);
} }
private void initWithScript(String script, Set<String> relations) throws TbNodeException { private void initWithScript(String script, Set<String> relations, boolean routeToAll) throws TbNodeException {
TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration(); TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration();
config.setJsScript(script); config.setJsScript(script);
config.setAllowedRelations(relations); config.setAllowedRelations(relations);
config.setRouteToAllWithNoCheck(routeToAll);
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
nodeConfiguration.setData(mapper.valueToTree(config)); nodeConfiguration.setData(mapper.valueToTree(config));
@ -112,10 +147,10 @@ public class TbJsSwitchNodeTest {
private void mockJsExecutor() { private void mockJsExecutor() {
when(ctx.getJsExecutor()).thenReturn(executor); when(ctx.getJsExecutor()).thenReturn(executor);
doAnswer((Answer<ListenableFuture<String>>) invocationOnMock -> { doAnswer((Answer<ListenableFuture<Set<String>>>) invocationOnMock -> {
try { try {
Callable task = (Callable) (invocationOnMock.getArguments())[0]; Callable task = (Callable) (invocationOnMock.getArguments())[0];
return Futures.immediateFuture((String) task.call()); return Futures.immediateFuture((Set<String>) task.call());
} catch (Throwable th) { } catch (Throwable th) {
return Futures.immediateFailedFuture(th); return Futures.immediateFailedFuture(th);
} }

View File

@ -0,0 +1,125 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.asset.AssetService;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TbChangeOriginatorNodeTest {
private TbChangeOriginatorNode node;
@Mock
private TbContext ctx;
@Mock
private AssetService assetService;
@Test
public void originatorCanBeChangedToCustomerId() throws TbNodeException {
init(false);
AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset();
asset.setCustomerId(customerId);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellNext(captor.capture());
TbMsg actualMsg = captor.getValue();
assertEquals(customerId, actualMsg.getOriginator());
assertEquals(msg.getId(), actualMsg.getId());
}
@Test
public void newChainCanBeStarted() throws TbNodeException {
init(true);
AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset();
asset.setCustomerId(customerId);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).spawn(captor.capture());
TbMsg actualMsg = captor.getValue();
assertEquals(customerId, actualMsg.getOriginator());
assertEquals(msg.getId(), actualMsg.getId());
}
@Test
public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException {
init(true);
AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset();
asset.setCustomerId(customerId);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong")));
node.onMsg(ctx, msg);
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellError(same(msg), captor.capture());
Throwable value = captor.getValue();
assertEquals("wrong", value.getMessage());
}
public void init(boolean startNewChain) throws TbNodeException {
TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration();
config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE);
config.setStartNewChain(startNewChain);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
nodeConfiguration.setData(mapper.valueToTree(config));
node = new TbChangeOriginatorNode();
node.init(nodeConfiguration, null);
}
}

View File

@ -0,0 +1,141 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transform;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.concurrent.Callable;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class TbTransformMsgNodeTest {
private TbTransformMsgNode node;
@Mock
private TbContext ctx;
@Mock
private ListeningExecutor executor;
@Test
public void metadataCanBeUpdated() throws TbNodeException {
initWithScript("meta.temp = meta.temp * 10;");
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
mockJsExecutor();
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellNext(captor.capture());
TbMsg actualMsg = captor.getValue();
assertEquals("70.0", actualMsg.getMetaData().getValue("temp"));
}
@Test
public void metadataCanBeAdded() throws TbNodeException {
initWithScript("meta.newAttr = meta.humidity - msg.passed;");
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
mockJsExecutor();
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellNext(captor.capture());
TbMsg actualMsg = captor.getValue();
assertEquals("94.0", actualMsg.getMetaData().getValue("newAttr"));
}
@Test
public void payloadCanBeUpdated() throws TbNodeException {
initWithScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' ");
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
mockJsExecutor();
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellNext(captor.capture());
TbMsg actualMsg = captor.getValue();
String expectedJson = "{\"name\":\"Vit\",\"passed\":35.0,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}";
assertEquals(expectedJson, new String(actualMsg.getData()));
}
private void initWithScript(String script) throws TbNodeException {
TbTransformMsgNodeConfiguration config = new TbTransformMsgNodeConfiguration();
config.setJsScript(script);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
nodeConfiguration.setData(mapper.valueToTree(config));
node = new TbTransformMsgNode();
node.init(nodeConfiguration, null);
}
private void mockJsExecutor() {
when(ctx.getJsExecutor()).thenReturn(executor);
doAnswer((Answer<ListenableFuture<TbMsg>>) invocationOnMock -> {
try {
Callable task = (Callable) (invocationOnMock.getArguments())[0];
return Futures.immediateFuture((TbMsg) task.call());
} catch (Throwable th) {
return Futures.immediateFailedFuture(th);
}
}).when(executor).executeAsync(Matchers.any(Callable.class));
}
private void verifyError(TbMsg msg, String message, Class expectedClass) {
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellError(same(msg), captor.capture());
Throwable value = captor.getValue();
assertEquals(expectedClass, value.getClass());
assertEquals(message, value.getMessage());
}
}