add filter & switch Nodes.
add self get latest Telemetry.
This commit is contained in:
		
							parent
							
								
									46e79bf66c
								
							
						
					
					
						commit
						f51be6b212
					
				@ -0,0 +1,12 @@
 | 
			
		||||
package org.thingsboard.rule.engine.api;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
 | 
			
		||||
public interface ListeningExecutor {
 | 
			
		||||
 | 
			
		||||
    <T> ListenableFuture<T> executeAsync(Callable<T> task);
 | 
			
		||||
 | 
			
		||||
    void onDestroy();
 | 
			
		||||
}
 | 
			
		||||
@ -74,4 +74,6 @@ public interface TbContext {
 | 
			
		||||
 | 
			
		||||
    RelationService getRelationService();
 | 
			
		||||
 | 
			
		||||
    ListeningExecutor getJsExecutor();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,51 @@
 | 
			
		||||
package org.thingsboard.rule.engine.filter;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.rule.engine.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.api.*;
 | 
			
		||||
import org.thingsboard.rule.engine.js.NashornJsEngine;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import javax.script.Bindings;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TbJsFilterNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private TbJsFilterNodeConfiguration config;
 | 
			
		||||
    private NashornJsEngine jsEngine;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbJsFilterNodeConfiguration.class);
 | 
			
		||||
        this.jsEngine = new NashornJsEngine(config.getJsScript());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        ListeningExecutor jsExecutor = ctx.getJsExecutor();
 | 
			
		||||
        withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))),
 | 
			
		||||
                result -> processFilter(ctx, msg, result),
 | 
			
		||||
                t -> ctx.tellError(msg, t));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processFilter(TbContext ctx, TbMsg msg, Boolean filterResult) {
 | 
			
		||||
        if (filterResult) {
 | 
			
		||||
            ctx.tellNext(msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            log.debug("Msg filtered out {}", msg.getId());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Bindings toBindings(TbMsg msg) {
 | 
			
		||||
        return NashornJsEngine.bindMsg(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        if (jsEngine != null) {
 | 
			
		||||
            jsEngine.destroy();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,9 @@
 | 
			
		||||
package org.thingsboard.rule.engine.filter;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class TbJsFilterNodeConfiguration {
 | 
			
		||||
 | 
			
		||||
    private String jsScript;
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,56 @@
 | 
			
		||||
package org.thingsboard.rule.engine.filter;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.rule.engine.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.api.*;
 | 
			
		||||
import org.thingsboard.rule.engine.js.NashornJsEngine;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import javax.script.Bindings;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class TbJsSwitchNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private TbJsSwitchNodeConfiguration config;
 | 
			
		||||
    private NashornJsEngine jsEngine;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class);
 | 
			
		||||
        this.jsEngine = new NashornJsEngine(config.getJsScript());
 | 
			
		||||
        if (config.getAllowedRelations().size() < 1) {
 | 
			
		||||
            String message = "Switch node should have at least 1 relation";
 | 
			
		||||
            log.error(message);
 | 
			
		||||
            throw new IllegalStateException(message);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        ListeningExecutor jsExecutor = ctx.getJsExecutor();
 | 
			
		||||
        withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))),
 | 
			
		||||
                result -> processSwitch(ctx, msg, result),
 | 
			
		||||
                t -> ctx.tellError(msg, t));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processSwitch(TbContext ctx, TbMsg msg, String nextRelation) {
 | 
			
		||||
        if (config.getAllowedRelations().contains(nextRelation)) {
 | 
			
		||||
            ctx.tellNext(msg, nextRelation);
 | 
			
		||||
        } else {
 | 
			
		||||
            ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelation));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Bindings toBindings(TbMsg msg) {
 | 
			
		||||
        return NashornJsEngine.bindMsg(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        if (jsEngine != null) {
 | 
			
		||||
            jsEngine.destroy();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,12 @@
 | 
			
		||||
package org.thingsboard.rule.engine.filter;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class TbJsSwitchNodeConfiguration {
 | 
			
		||||
 | 
			
		||||
    private String jsScript;
 | 
			
		||||
    private Set<String> allowedRelations;
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,30 @@
 | 
			
		||||
package org.thingsboard.rule.engine.js;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import org.thingsboard.rule.engine.api.ListeningExecutor;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
 | 
			
		||||
public class JsExecutorService implements ListeningExecutor{
 | 
			
		||||
 | 
			
		||||
    private final ListeningExecutorService service;
 | 
			
		||||
 | 
			
		||||
    public JsExecutorService(int threadCount) {
 | 
			
		||||
        this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
 | 
			
		||||
        return service.submit(task);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onDestroy() {
 | 
			
		||||
        service.shutdown();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,79 @@
 | 
			
		||||
package org.thingsboard.rule.engine.js;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.ArrayUtils;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import javax.script.*;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class NashornJsEngine {
 | 
			
		||||
 | 
			
		||||
    public static final String METADATA = "meta";
 | 
			
		||||
    public static final String DATA = "msg";
 | 
			
		||||
    private static NashornScriptEngineFactory factory = new NashornScriptEngineFactory();
 | 
			
		||||
 | 
			
		||||
    private CompiledScript engine;
 | 
			
		||||
 | 
			
		||||
    public NashornJsEngine(String script) {
 | 
			
		||||
        engine = compileScript(script);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static CompiledScript compileScript(String script) {
 | 
			
		||||
        ScriptEngine engine = factory.getScriptEngine(new String[]{"--no-java"});
 | 
			
		||||
        Compilable compEngine = (Compilable) engine;
 | 
			
		||||
        try {
 | 
			
		||||
            return compEngine.compile(script);
 | 
			
		||||
        } catch (ScriptException e) {
 | 
			
		||||
            log.warn("Failed to compile JS script: {}", e.getMessage(), e);
 | 
			
		||||
            throw new IllegalArgumentException("Can't compile script: " + e.getMessage());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static Bindings bindMsg(TbMsg msg) {
 | 
			
		||||
        try {
 | 
			
		||||
            Bindings bindings = new SimpleBindings();
 | 
			
		||||
            bindings.put(METADATA, msg.getMetaData().getData());
 | 
			
		||||
 | 
			
		||||
            if (ArrayUtils.isNotEmpty(msg.getData())) {
 | 
			
		||||
                ObjectMapper mapper = new ObjectMapper();
 | 
			
		||||
                JsonNode jsonNode = mapper.readTree(msg.getData());
 | 
			
		||||
                Map map = mapper.treeToValue(jsonNode, Map.class);
 | 
			
		||||
                bindings.put(DATA, map);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return bindings;
 | 
			
		||||
        } catch (Throwable th) {
 | 
			
		||||
            throw new IllegalArgumentException("Cannot bind js args", th);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean executeFilter(Bindings bindings) throws ScriptException {
 | 
			
		||||
        Object eval = engine.eval(bindings);
 | 
			
		||||
        if (eval instanceof Boolean) {
 | 
			
		||||
            return (boolean) eval;
 | 
			
		||||
        } else {
 | 
			
		||||
            log.warn("Wrong result type: {}", eval);
 | 
			
		||||
            throw new ScriptException("Wrong result type: " + eval);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException {
 | 
			
		||||
        Object eval = this.engine.eval(bindings);
 | 
			
		||||
        if (eval instanceof String) {
 | 
			
		||||
            return (String) eval;
 | 
			
		||||
        } else {
 | 
			
		||||
            log.warn("Wrong result type: {}", eval);
 | 
			
		||||
            throw new ScriptException("Wrong result type: " + eval);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        engine = null;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -1,12 +1,12 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * <p>
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * <p>
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 * <p>
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
@ -15,13 +15,16 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.metadata;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.AsyncFunction;
 | 
			
		||||
import com.google.common.base.Function;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.collections.CollectionUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.api.*;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -44,24 +47,40 @@ public class TbGetAttributesNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
 | 
			
		||||
        ListenableFuture<List<Void>> future = Futures.allAsList(
 | 
			
		||||
                putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."),
 | 
			
		||||
                putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."),
 | 
			
		||||
                putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss."));
 | 
			
		||||
        if (CollectionUtils.isNotEmpty(config.getLatestTsKeyNames())) {
 | 
			
		||||
            withCallback(getLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
 | 
			
		||||
                    i -> ctx.tellNext(msg),
 | 
			
		||||
                    t -> ctx.tellError(msg, t));
 | 
			
		||||
        } else {
 | 
			
		||||
            ListenableFuture<List<Void>> future = Futures.allAsList(
 | 
			
		||||
                    putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."),
 | 
			
		||||
                    putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."),
 | 
			
		||||
                    putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss."));
 | 
			
		||||
 | 
			
		||||
        withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
 | 
			
		||||
            withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> putAttributesAsync(TbMsg msg, List<AttributeKvEntry> attributes, String prefix) {
 | 
			
		||||
    private ListenableFuture<Void> putAttr(TbMsg msg, List<KvEntry> attributes, String prefix) {
 | 
			
		||||
        attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
 | 
			
		||||
        return Futures.immediateFuture(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> attributes, String prefix) {
 | 
			
		||||
        return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes),
 | 
			
		||||
                (AsyncFunction<List<AttributeKvEntry>, Void>) i -> putAttributesAsync(msg, i, prefix));
 | 
			
		||||
        ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, attributes);
 | 
			
		||||
        return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, Void>) l -> {
 | 
			
		||||
            l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> getLatestTelemetry(TbContext ctx, TbMsg msg, List<String> attributes) {
 | 
			
		||||
        ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), attributes);
 | 
			
		||||
        return Futures.transform(latest, (Function<? super List<TsKvEntry>, Void>) l -> {
 | 
			
		||||
            l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
 | 
			
		||||
@ -29,4 +29,6 @@ public class TbGetAttributesNodeConfiguration {
 | 
			
		||||
    private List<String> sharedAttributeNames;
 | 
			
		||||
    private List<String> serverAttributeNames;
 | 
			
		||||
 | 
			
		||||
    private List<String> latestTsKeyNames;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,154 @@
 | 
			
		||||
package org.thingsboard.rule.engine.filter;
 | 
			
		||||
 | 
			
		||||
import com.datastax.driver.core.utils.UUIDs;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.mockito.Matchers;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.runners.MockitoJUnitRunner;
 | 
			
		||||
import org.mockito.stubbing.Answer;
 | 
			
		||||
import org.thingsboard.rule.engine.api.ListeningExecutor;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
import javax.script.ScriptException;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
 | 
			
		||||
import static org.junit.Assert.assertEquals;
 | 
			
		||||
import static org.mockito.Mockito.*;
 | 
			
		||||
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class TbJsFilterNodeTest {
 | 
			
		||||
 | 
			
		||||
    private TbJsFilterNode node;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbContext ctx;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private ListeningExecutor executor;
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void falseEvaluationDoNotSendMsg() throws TbNodeException {
 | 
			
		||||
        initWithScript("10 > 15;");
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes());
 | 
			
		||||
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verify(ctx).getJsExecutor();
 | 
			
		||||
        verifyNoMoreInteractions(ctx);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void notValidMsgDataThrowsException() throws TbNodeException {
 | 
			
		||||
        initWithScript("10 > 15;");
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), new byte[4]);
 | 
			
		||||
 | 
			
		||||
        when(ctx.getJsExecutor()).thenReturn(executor);
 | 
			
		||||
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verifyError(msg, "Cannot bind js args", IllegalArgumentException.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void exceptionInJsThrowsException() throws TbNodeException {
 | 
			
		||||
        initWithScript("meta.temp.curr < 15;");
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        String expectedMessage = "TypeError: Cannot get property \"curr\" of null in <eval> at line number 1";
 | 
			
		||||
        verifyError(msg, expectedMessage, ScriptException.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test(expected = IllegalArgumentException.class)
 | 
			
		||||
    public void notValidScriptThrowsException() throws TbNodeException {
 | 
			
		||||
        initWithScript("10 > 15 asdq out");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void metadataConditionCanBeFalse() throws TbNodeException {
 | 
			
		||||
        initWithScript("meta.humidity < 15;");
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("temp", "10");
 | 
			
		||||
        metaData.putValue("humidity", "99");
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verify(ctx).getJsExecutor();
 | 
			
		||||
        verifyNoMoreInteractions(ctx);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void metadataConditionCanBeTrue() throws TbNodeException {
 | 
			
		||||
        initWithScript("meta.temp < 15;");
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("temp", "10");
 | 
			
		||||
        metaData.putValue("humidity", "99");
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verify(ctx).getJsExecutor();
 | 
			
		||||
        verify(ctx).tellNext(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void msgJsonParsedAndBinded() throws TbNodeException {
 | 
			
		||||
        initWithScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;");
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("temp", "10");
 | 
			
		||||
        metaData.putValue("humidity", "99");
 | 
			
		||||
        String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verify(ctx).getJsExecutor();
 | 
			
		||||
        verify(ctx).tellNext(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void initWithScript(String script) throws TbNodeException {
 | 
			
		||||
        TbJsFilterNodeConfiguration config = new TbJsFilterNodeConfiguration();
 | 
			
		||||
        config.setJsScript(script);
 | 
			
		||||
        ObjectMapper mapper = new ObjectMapper();
 | 
			
		||||
        TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
 | 
			
		||||
        nodeConfiguration.setData(mapper.valueToTree(config));
 | 
			
		||||
 | 
			
		||||
        node = new TbJsFilterNode();
 | 
			
		||||
        node.init(nodeConfiguration, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void mockJsExecutor() {
 | 
			
		||||
        when(ctx.getJsExecutor()).thenReturn(executor);
 | 
			
		||||
        doAnswer((Answer<ListenableFuture<Boolean>>) invocationOnMock -> {
 | 
			
		||||
            try {
 | 
			
		||||
                Callable task = (Callable) (invocationOnMock.getArguments())[0];
 | 
			
		||||
                return Futures.immediateFuture((Boolean) task.call());
 | 
			
		||||
            } catch (Throwable th) {
 | 
			
		||||
                return Futures.immediateFailedFuture(th);
 | 
			
		||||
            }
 | 
			
		||||
        }).when(executor).executeAsync(Matchers.any(Callable.class));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void verifyError(TbMsg msg, String message, Class expectedClass) {
 | 
			
		||||
        ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
 | 
			
		||||
        verify(ctx).tellError(same(msg), captor.capture());
 | 
			
		||||
 | 
			
		||||
        Throwable value = captor.getValue();
 | 
			
		||||
        assertEquals(expectedClass, value.getClass());
 | 
			
		||||
        assertEquals(message, value.getMessage());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,118 @@
 | 
			
		||||
package org.thingsboard.rule.engine.filter;
 | 
			
		||||
 | 
			
		||||
import com.datastax.driver.core.utils.UUIDs;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import com.google.common.collect.Sets;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.mockito.Matchers;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.runners.MockitoJUnitRunner;
 | 
			
		||||
import org.mockito.stubbing.Answer;
 | 
			
		||||
import org.thingsboard.rule.engine.api.ListeningExecutor;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
 | 
			
		||||
import static org.junit.Assert.*;
 | 
			
		||||
import static org.mockito.Matchers.same;
 | 
			
		||||
import static org.mockito.Mockito.doAnswer;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class TbJsSwitchNodeTest {
 | 
			
		||||
 | 
			
		||||
    private TbJsSwitchNode node;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbContext ctx;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private ListeningExecutor executor;
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void allowedRelationPassed() throws TbNodeException {
 | 
			
		||||
        String jsCode = "function nextRelation(meta, msg) {\n" +
 | 
			
		||||
                "    if(msg.passed == 5 && meta.temp == 10)\n" +
 | 
			
		||||
                "        return 'one'\n" +
 | 
			
		||||
                "    else\n" +
 | 
			
		||||
                "        return 'two';\n" +
 | 
			
		||||
                "};\n" +
 | 
			
		||||
                "\n" +
 | 
			
		||||
                "nextRelation(meta, msg);";
 | 
			
		||||
        initWithScript(jsCode, Sets.newHashSet("one", "two"));
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("temp", "10");
 | 
			
		||||
        metaData.putValue("humidity", "99");
 | 
			
		||||
        String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verify(ctx).getJsExecutor();
 | 
			
		||||
        verify(ctx).tellNext(msg, "one");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void unknownRelationThrowsException() throws TbNodeException {
 | 
			
		||||
        String jsCode = "function nextRelation(meta, msg) {\n" +
 | 
			
		||||
                "    return 'nine';" +
 | 
			
		||||
                "};\n" +
 | 
			
		||||
                "\n" +
 | 
			
		||||
                "nextRelation(meta, msg);";
 | 
			
		||||
        initWithScript(jsCode, Sets.newHashSet("one", "two"));
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("temp", "10");
 | 
			
		||||
        metaData.putValue("humidity", "99");
 | 
			
		||||
        String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
 | 
			
		||||
        mockJsExecutor();
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verify(ctx).getJsExecutor();
 | 
			
		||||
        verifyError(msg, "Unsupported relation for switch nine", IllegalStateException.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void initWithScript(String script, Set<String> relations) throws TbNodeException {
 | 
			
		||||
        TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration();
 | 
			
		||||
        config.setJsScript(script);
 | 
			
		||||
        config.setAllowedRelations(relations);
 | 
			
		||||
        ObjectMapper mapper = new ObjectMapper();
 | 
			
		||||
        TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
 | 
			
		||||
        nodeConfiguration.setData(mapper.valueToTree(config));
 | 
			
		||||
 | 
			
		||||
        node = new TbJsSwitchNode();
 | 
			
		||||
        node.init(nodeConfiguration, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void mockJsExecutor() {
 | 
			
		||||
        when(ctx.getJsExecutor()).thenReturn(executor);
 | 
			
		||||
        doAnswer((Answer<ListenableFuture<String>>) invocationOnMock -> {
 | 
			
		||||
            try {
 | 
			
		||||
                Callable task = (Callable) (invocationOnMock.getArguments())[0];
 | 
			
		||||
                return Futures.immediateFuture((String) task.call());
 | 
			
		||||
            } catch (Throwable th) {
 | 
			
		||||
                return Futures.immediateFailedFuture(th);
 | 
			
		||||
            }
 | 
			
		||||
        }).when(executor).executeAsync(Matchers.any(Callable.class));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void verifyError(TbMsg msg, String message, Class expectedClass) {
 | 
			
		||||
        ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
 | 
			
		||||
        verify(ctx).tellError(same(msg), captor.capture());
 | 
			
		||||
 | 
			
		||||
        Throwable value = captor.getValue();
 | 
			
		||||
        assertEquals(expectedClass, value.getClass());
 | 
			
		||||
        assertEquals(message, value.getMessage());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user