Calculate delta node

This commit is contained in:
Andrii Shvaika 2021-02-08 18:18:19 +02:00
parent cd3d388d21
commit 7c019a7123
2 changed files with 37 additions and 37 deletions

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
@ -36,6 +37,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -44,9 +46,9 @@ import java.util.concurrent.ConcurrentHashMap;
@RuleNode(type = ComponentType.ENRICHMENT,
name = "calculate delta",
configClazz = CalculateDeltaNodeConfiguration.class,
nodeDescription = "Add delta value into message by originator Entity Id",
nodeDetails = "Calculates delta and period based on the previous data and current data. " +
"Groups incoming data based on originator id of the message (i.e. particular device, asset, customer).",
nodeDescription = "Calculates and adds 'delta' value into message based on the incoming and previous value",
nodeDetails = "Calculates delta and period based on the previous time-series reading and current data. " +
"Delta calculation is done in scope of the message originator, e.g. device, asset or customer.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbEnrichmentNodeCalculateDeltaConfig")
public class CalculateDeltaNode implements TbNode {
@ -76,7 +78,7 @@ public class CalculateDeltaNode implements TbNode {
DonAsynchron.withCallback(getLastValue(msg.getOriginator()),
previousData -> {
double currentValue = json.get(inputKey).asDouble();
long currentTs = json.has("ts") ? json.get("ts").asLong() : System.currentTimeMillis();
long currentTs = TbMsgTimeseriesNode.getTs(msg);
if (useCache) {
cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue));
@ -90,7 +92,7 @@ public class CalculateDeltaNode implements TbNode {
}
if (config.getRound() != null) {
delta = delta.setScale(config.getRound(), BigDecimal.ROUND_HALF_UP);
delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
}
ObjectNode result = (ObjectNode) json;
@ -102,8 +104,7 @@ public class CalculateDeltaNode implements TbNode {
}
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
},
t -> ctx.tellFailure(msg, t)
, ctx.getDbCallbackExecutor());
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
} else if (config.isTellFailureIfInputValueKeyIsAbsent()) {
ctx.tellNext(msg, TbRelationTypes.FAILURE);
} else {
@ -113,7 +114,9 @@ public class CalculateDeltaNode implements TbNode {
@Override
public void destroy() {
if (useCache) {
cache.clear();
}
}
private ListenableFuture<ValueWithTs> fetchLatestValue(EntityId entityId) {
@ -132,33 +135,30 @@ public class CalculateDeltaNode implements TbNode {
}
private ValueWithTs extractValue(TsKvEntry kvEntry) {
if (kvEntry == null || kvEntry.getValue() == null) {
return null;
}
double result = 0.0;
long ts = 0;
if (kvEntry != null) {
ts = kvEntry.getTs();
switch (kvEntry.getDataType()) {
case LONG:
result = kvEntry.getLongValue().get();
break;
case DOUBLE:
result = kvEntry.getDoubleValue().get();
break;
case BOOLEAN:
result = kvEntry.getBooleanValue().get() ? 1 : 0;
break;
case STRING:
String str = kvEntry.getStrValue().orElse(null);
try {
if (str == null) {
return null;
}
result = Double.parseDouble(str);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Calculation failed. Unable to parse value [" + str + "]" +
" of telemetry [" + kvEntry.getKey() + "] to Double");
}
break;
}
long ts = kvEntry.getTs();
switch (kvEntry.getDataType()) {
case LONG:
result = kvEntry.getLongValue().get();
break;
case DOUBLE:
result = kvEntry.getDoubleValue().get();
break;
case STRING:
try {
result = Double.parseDouble(kvEntry.getStrValue().get());
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Calculation failed. Unable to parse value [" + kvEntry.getStrValue().get() + "]" +
" of telemetry [" + kvEntry.getKey() + "] to Double");
}
break;
case BOOLEAN:
throw new IllegalArgumentException("Calculation failed. Boolean values are not supported!");
case JSON:
throw new IllegalArgumentException("Calculation failed. JSON values are not supported!");
}
return new ValueWithTs(ts, result);
}

View File

@ -32,8 +32,8 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration<Calcul
@Override
public CalculateDeltaNodeConfiguration defaultConfiguration() {
CalculateDeltaNodeConfiguration configuration = new CalculateDeltaNodeConfiguration();
configuration.setInputValueKey("value");
configuration.setOutputValueKey("valueDelta");
configuration.setInputValueKey("pulseCounter");
configuration.setOutputValueKey("delta");
configuration.setUseCache(true);
configuration.setAddPeriodBetweenMsgs(false);
configuration.setPeriodValueKey("periodInMs");