From 7c019a712366b3988218a8723831a9fab6f5dce3 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 8 Feb 2021 18:18:19 +0200 Subject: [PATCH] Calculate delta node --- .../engine/metadata/CalculateDeltaNode.java | 70 +++++++++---------- .../CalculateDeltaNodeConfiguration.java | 4 +- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java index d0bf9a70a0..02269dfa36 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; @@ -36,6 +37,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.mapping.JacksonUtil; import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -44,9 +46,9 @@ import java.util.concurrent.ConcurrentHashMap; @RuleNode(type = ComponentType.ENRICHMENT, name = "calculate delta", configClazz = CalculateDeltaNodeConfiguration.class, - nodeDescription = "Add delta value into message by originator Entity Id", - nodeDetails = "Calculates delta and period based on the previous data and current data. " + - "Groups incoming data based on originator id of the message (i.e. particular device, asset, customer).", + nodeDescription = "Calculates and adds 'delta' value into message based on the incoming and previous value", + nodeDetails = "Calculates delta and period based on the previous time-series reading and current data. " + + "Delta calculation is done in scope of the message originator, e.g. device, asset or customer.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbEnrichmentNodeCalculateDeltaConfig") public class CalculateDeltaNode implements TbNode { @@ -76,7 +78,7 @@ public class CalculateDeltaNode implements TbNode { DonAsynchron.withCallback(getLastValue(msg.getOriginator()), previousData -> { double currentValue = json.get(inputKey).asDouble(); - long currentTs = json.has("ts") ? json.get("ts").asLong() : System.currentTimeMillis(); + long currentTs = TbMsgTimeseriesNode.getTs(msg); if (useCache) { cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue)); @@ -90,7 +92,7 @@ public class CalculateDeltaNode implements TbNode { } if (config.getRound() != null) { - delta = delta.setScale(config.getRound(), BigDecimal.ROUND_HALF_UP); + delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); } ObjectNode result = (ObjectNode) json; @@ -102,8 +104,7 @@ public class CalculateDeltaNode implements TbNode { } ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result))); }, - t -> ctx.tellFailure(msg, t) - , ctx.getDbCallbackExecutor()); + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); } else if (config.isTellFailureIfInputValueKeyIsAbsent()) { ctx.tellNext(msg, TbRelationTypes.FAILURE); } else { @@ -113,7 +114,9 @@ public class CalculateDeltaNode implements TbNode { @Override public void destroy() { - + if (useCache) { + cache.clear(); + } } private ListenableFuture fetchLatestValue(EntityId entityId) { @@ -132,33 +135,30 @@ public class CalculateDeltaNode implements TbNode { } private ValueWithTs extractValue(TsKvEntry kvEntry) { + if (kvEntry == null || kvEntry.getValue() == null) { + return null; + } double result = 0.0; - long ts = 0; - if (kvEntry != null) { - ts = kvEntry.getTs(); - switch (kvEntry.getDataType()) { - case LONG: - result = kvEntry.getLongValue().get(); - break; - case DOUBLE: - result = kvEntry.getDoubleValue().get(); - break; - case BOOLEAN: - result = kvEntry.getBooleanValue().get() ? 1 : 0; - break; - case STRING: - String str = kvEntry.getStrValue().orElse(null); - try { - if (str == null) { - return null; - } - result = Double.parseDouble(str); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Calculation failed. Unable to parse value [" + str + "]" + - " of telemetry [" + kvEntry.getKey() + "] to Double"); - } - break; - } + long ts = kvEntry.getTs(); + switch (kvEntry.getDataType()) { + case LONG: + result = kvEntry.getLongValue().get(); + break; + case DOUBLE: + result = kvEntry.getDoubleValue().get(); + break; + case STRING: + try { + result = Double.parseDouble(kvEntry.getStrValue().get()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Calculation failed. Unable to parse value [" + kvEntry.getStrValue().get() + "]" + + " of telemetry [" + kvEntry.getKey() + "] to Double"); + } + break; + case BOOLEAN: + throw new IllegalArgumentException("Calculation failed. Boolean values are not supported!"); + case JSON: + throw new IllegalArgumentException("Calculation failed. JSON values are not supported!"); } return new ValueWithTs(ts, result); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeConfiguration.java index 7978dd4172..b16a97caec 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeConfiguration.java @@ -32,8 +32,8 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration