diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryCertainTimeRangeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryCertainTimeRangeNode.java new file mode 100644 index 0000000000..a635b16d09 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryCertainTimeRangeNode.java @@ -0,0 +1,105 @@ +package org.thingsboard.rule.engine.metadata; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.api.util.DonAsynchron; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.kv.BaseTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvQuery; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; +import static org.thingsboard.server.common.data.kv.Aggregation.NONE; + +/** + * Created by mshvayka on 04.09.18. + */ +@Slf4j +@RuleNode(type = ComponentType.ENRICHMENT, + name = "huy", + configClazz = TbGetTelemetryCertainTimeRangeNodeConfiguration.class, + nodeDescription = "", + nodeDetails = "", + uiResources = "", //{"static/rulenode/rulenode-core-config.js"}, + configDirective = "") +public class TbGetTelemetryCertainTimeRangeNode implements TbNode { + + private TbGetTelemetryCertainTimeRangeNodeConfiguration config; + private List tsKeyNames; + private long startTsOffset; + private long endTsOffset; + private int limit; + private ObjectMapper mapper; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbGetTelemetryCertainTimeRangeNodeConfiguration.class); + tsKeyNames = config.getLatestTsKeyNames(); + startTsOffset = TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval()); + endTsOffset = TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval()); + limit = config.getFetchMode().equals(TbGetTelemetryCertainTimeRangeNodeConfiguration.FETCH_MODE_ALL) + ? TbGetTelemetryCertainTimeRangeNodeConfiguration.MAX_FETCH_SIZE : 1; + mapper = new ObjectMapper(); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + ObjectNode resultNode = mapper.createObjectNode(); + List queries = new ArrayList<>(); + long ts = System.currentTimeMillis(); + long startTs = ts - startTsOffset; + long endTs = ts - endTsOffset; + if (tsKeyNames.isEmpty()) { + ctx.tellFailure(msg, new Exception("Telemetry not found")); + } else { + for (String key : tsKeyNames) { + //TODO: handle direction; + queries.add(new BaseTsKvQuery(key, startTs, endTs, 1, limit, NONE)); + if (limit == TbGetTelemetryCertainTimeRangeNodeConfiguration.MAX_FETCH_SIZE) { + resultNode.set(key, mapper.createArrayNode()); + } else { + resultNode.putObject(key); + } + } + try { + ListenableFuture> list = ctx.getTimeseriesService().findAll(msg.getOriginator(), queries); + DonAsynchron.withCallback(list, data -> { + for (TsKvEntry tsKvEntry : data) { + JsonNode node = resultNode.get(tsKvEntry.getKey()); + if (node.isArray()) { + ArrayNode arrayNode = (ArrayNode) node; + arrayNode.add(mapper.createObjectNode().put(String.valueOf(tsKvEntry.getTs()), tsKvEntry.getValueAsString())); + } else { + ObjectNode object = mapper.createObjectNode().put(String.valueOf(tsKvEntry.getTs()), tsKvEntry.getValueAsString()); + resultNode.set(tsKvEntry.getKey(), object); + } + } + for (String key : tsKeyNames) { + msg.getMetaData().putValue(key, resultNode.get(key).toString()); + } + TbMsg newMsg = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()); + ctx.tellNext(newMsg, SUCCESS); + }, error -> ctx.tellFailure(msg, error)); + } catch (Exception e) { + ctx.tellFailure(msg, e); + } + } + } + + @Override + public void destroy() { + + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryCertainTimeRangeNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryCertainTimeRangeNodeConfiguration.java new file mode 100644 index 0000000000..b1c27ca3b3 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryCertainTimeRangeNodeConfiguration.java @@ -0,0 +1,41 @@ +package org.thingsboard.rule.engine.metadata; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Created by mshvayka on 04.09.18. + */ +@Data +public class TbGetTelemetryCertainTimeRangeNodeConfiguration implements NodeConfiguration { + + public static final String FETCH_MODE_FIRST = "FIRST"; + public static final String FETCH_MODE_LAST = "LAST"; + public static final String FETCH_MODE_ALL = "ALL"; + public static final int MAX_FETCH_SIZE = 1000; + + private int startInterval; + private int endInterval; + private String startIntervalTimeUnit; + private String endIntervalTimeUnit; + private String fetchMode; //FIRST, LAST, LATEST + + private List latestTsKeyNames; + + + + @Override + public TbGetTelemetryCertainTimeRangeNodeConfiguration defaultConfiguration() { + TbGetTelemetryCertainTimeRangeNodeConfiguration configuration = new TbGetTelemetryCertainTimeRangeNodeConfiguration(); + configuration.setLatestTsKeyNames(Collections.emptyList()); + configuration.setStartIntervalTimeUnit(TimeUnit.MINUTES.name()); + configuration.setStartInterval(1); + configuration.setEndIntervalTimeUnit(TimeUnit.MINUTES.name()); + configuration.setEndInterval(2); + return configuration; + } +}