diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 3c3fa8e21a..03e0135715 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.metadata; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.json.JsonWriteFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,6 +34,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -50,7 +50,6 @@ import java.util.stream.Collectors; import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL; import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST; import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE; -import static org.thingsboard.server.common.data.kv.Aggregation.NONE; /** * Created by mshvayka on 04.09.18. @@ -64,6 +63,7 @@ import static org.thingsboard.server.common.data.kv.Aggregation.NONE; "If selected fetch mode ALL Telemetry will be added like array into Message Metadata where key is Timestamp and value is value of Telemetry.
" + "If selected fetch mode FIRST or LAST Telemetry will be added like string without Timestamp.
" + "Also, the rule node allows you to select telemetry sampling order: ASC or DESC.
" + + "Aggregation feature allows you to fetch aggregated telemetry as a single value by AVG, COUNT, SUM, MIN, MAX, NONE.
" + "Note: The maximum size of the fetched array is 1000 records.\n ", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbEnrichmentNodeGetTelemetryFromDatabase") @@ -78,6 +78,7 @@ public class TbGetTelemetryNode implements TbNode { private ObjectMapper mapper; private String fetchMode; private String orderByFetchAll; + private Aggregation aggregation; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -89,11 +90,20 @@ public class TbGetTelemetryNode implements TbNode { if (StringUtils.isEmpty(orderByFetchAll)) { orderByFetchAll = ASC_ORDER; } + aggregation = parseAggregationConfig(config.getAggregation()); + mapper = new ObjectMapper(); mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false); mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); } + Aggregation parseAggregationConfig(String aggName) { + if (StringUtils.isEmpty(aggName)) { + return Aggregation.NONE; + } + return Aggregation.valueOf(aggName); + } + @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { if (tsKeyNames.isEmpty()) { @@ -120,8 +130,14 @@ public class TbGetTelemetryNode implements TbNode { } private List buildQueries(TbMsg msg, List keys) { + final Interval interval = getInterval(msg); + final long aggIntervalStep = Aggregation.NONE.equals(aggregation) ? 1 : + // exact how it validates on BaseTimeseriesService.validate() + // see CassandraBaseTimeseriesDao.findAllAsync() + interval.getEndTs() - interval.getStartTs(); + return keys.stream() - .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, getOrderBy())) + .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, getOrderBy())) .collect(Collectors.toList()); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java index e4e673a2ab..1233191af5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.kv.Aggregation; import java.util.Collections; import java.util.List; @@ -46,6 +47,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration latestTsKeyNames; @@ -63,6 +65,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration