Math node implementation

This commit is contained in:
Andrii Shvaika 2022-09-26 18:20:37 +03:00
parent fb49273bd6
commit 15ee23c5d5
7 changed files with 468 additions and 65 deletions

View File

@ -25,11 +25,12 @@ import lombok.NoArgsConstructor;
public class TbMathArgument {
private TbMathArgumentType type;
private String value;
private String key;
private String attributeScope;
private Double defaultValue;
public TbMathArgument(TbMathArgumentType type, String value) {
public TbMathArgument(TbMathArgumentType type, String key) {
this.type = type;
this.value = value;
this.key = key;
}
}

View File

@ -33,29 +33,43 @@ public class TbMathArgumentValue {
}
public static TbMathArgumentValue constant(TbMathArgument arg) {
return fromString(arg.getValue());
return fromString(arg.getKey());
}
public static TbMathArgumentValue fromMessageBody(String key, Optional<ObjectNode> jsonNodeOpt) {
private static TbMathArgumentValue defaultOrThrow(Double defaultValue, String error) {
if (defaultValue != null) {
return new TbMathArgumentValue(defaultValue);
}
throw new RuntimeException(error);
}
public static TbMathArgumentValue fromMessageBody(TbMathArgument arg, Optional<ObjectNode> jsonNodeOpt) {
String key = arg.getKey();
Double defaultValue = arg.getDefaultValue();
if (jsonNodeOpt.isEmpty()) {
throw new RuntimeException("Message body is empty!");
return defaultOrThrow(defaultValue, "Message body is empty!");
}
var json = jsonNodeOpt.get();
if (!json.has(key)) {
throw new RuntimeException("Message body has no '" + key + "'!");
return defaultOrThrow(defaultValue, "Message body has no '" + key + "'!");
}
JsonNode valueNode = json.get(key);
if (valueNode.isEmpty() || valueNode.isNull()) {
throw new RuntimeException("Message body has empty or null '" + key + "'!");
if (valueNode.isNull()) {
return defaultOrThrow(defaultValue, "Message body has null '" + key + "'!");
}
double value;
if (valueNode.isNumber()) {
value = valueNode.doubleValue();
} else if (valueNode.isTextual()) {
try {
value = Double.parseDouble(valueNode.asText());
} catch (NumberFormatException ne) {
throw new RuntimeException("Can't convert value '" + valueNode.asText() + "' to double!");
var valueNodeText = valueNode.asText();
if (StringUtils.isNotBlank(valueNodeText)) {
try {
value = Double.parseDouble(valueNode.asText());
} catch (NumberFormatException ne) {
throw new RuntimeException("Can't convert value '" + valueNode.asText() + "' to double!");
}
} else {
return defaultOrThrow(defaultValue, "Message value is empty for '" + key + "'!");
}
} else {
throw new RuntimeException("Can't convert value '" + valueNode.toString() + "' to double!");
@ -63,13 +77,15 @@ public class TbMathArgumentValue {
return new TbMathArgumentValue(value);
}
public static TbMathArgumentValue fromMessageMetadata(String key, TbMsgMetaData metaData) {
public static TbMathArgumentValue fromMessageMetadata(TbMathArgument arg, TbMsgMetaData metaData) {
String key = arg.getKey();
Double defaultValue = arg.getDefaultValue();
if (metaData == null) {
throw new RuntimeException("Message metadata is empty!");
return defaultOrThrow(defaultValue, "Message metadata is empty!");
}
var value = metaData.getValue(key);
if (StringUtils.isEmpty(value)) {
throw new RuntimeException("Message metadata has no '" + key + "'!");
return defaultOrThrow(defaultValue, "Message metadata has no '" + key + "'!");
}
return fromString(value);
}

View File

@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.DonAsynchron;
@ -39,9 +38,10 @@ import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -53,24 +53,34 @@ import java.util.stream.Collectors;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "math formula",
configClazz = TbMathFormulaConfiguration.class,
nodeDescription = "Calculates the mathematics formula based on message and/or database values",
nodeDetails = "Transform incoming Message with configured JS function to String and log final value into Thingsboard log file. " +
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>. " +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>.",
name = "math function",
configClazz = TbMathNodeConfiguration.class,
nodeDescription = "Apply math function and save the result into the message and/or database",
nodeDetails = "Supports math operations like: ADD, SUB, MULT, DIV, etc and functions: SIN, COS, TAN, SEC, etc. " +
"<br/><br/>" +
"You may use constant, message field, metadata field, attribute, and latest time-series as an arguments values. " +
"The result of the function may be also stored to message field, metadata field, attribute or time-series value." +
"<br/><br/>" +
"Primary use case for this rule node is to take one or more values from the database and modify them based on data from the message. " +
"For example, you may increase `totalWaterConsumption` based on the `deltaWaterConsumption` reported by device." +
"<br/><br/>" +
"Alternative use case is the replacement of simple JS `script` nodes with more light-weight and performant implementation. " +
"For example, you may transform Fahrenheit to Celsius (C = (F - 32) / 1.8) using combination of two math node functions: SUB 32 and DIV 1.8." +
"<br/><br/>" +
"The execution is synchronized in scope of message originator (e.g. device) and server node. " +
"If you have rule nodes in different rule chains, they will process messages from the same originator synchronously in the scope of the server node.",
icon = "functions"
)
public class TbMathNode implements TbNode {
private static ConcurrentMap<EntityId, Semaphore> semaphores = new ConcurrentReferenceHashMap<>();
private static final ConcurrentMap<EntityId, Semaphore> semaphores = new ConcurrentReferenceHashMap<>();
private TbMathFormulaConfiguration config;
private TbMathNodeConfiguration config;
private boolean msgBodyToJsonConversionRequired;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMathFormulaConfiguration.class);
this.config = TbNodeUtils.convert(configuration, TbMathNodeConfiguration.class);
var operation = config.getOperation();
var argsCount = config.getArguments().size();
if (argsCount < operation.getMinArgs() || argsCount > operation.getMaxArgs()) {
@ -84,14 +94,49 @@ public class TbMathNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) {
var originator = msg.getOriginator();
var originatorSemaphore = semaphores.computeIfAbsent(originator, tmp -> new Semaphore(1, true));
boolean acquired = tryAcquire(originator, originatorSemaphore);
var arguments = config.getArguments();
Optional<ObjectNode> msgBodyOpt = convertMsgBodyIfRequired(msg);
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(ctx, msg, args)), ctx.getDbCallbackExecutor());
DonAsynchron.withCallback(resultMsgFuture, ctx::tellSuccess, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
if (!acquired) {
ctx.tellFailure(msg, new RuntimeException("Failed to process message for originator synchronously"));
return;
}
try {
var arguments = config.getArguments();
Optional<ObjectNode> msgBodyOpt = convertMsgBodyIfRequired(msg);
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(ctx, msg, args)), ctx.getDbCallbackExecutor());
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
} finally {
originatorSemaphore.release();
}
}, t -> {
try {
ctx.tellFailure(msg, t);
} finally {
originatorSemaphore.release();
}
}, ctx.getDbCallbackExecutor());
} catch (Throwable e) {
originatorSemaphore.release();
log.warn("[{}] Failed to process message: {}", originator, msg, e);
throw e;
}
}
private boolean tryAcquire(EntityId originator, Semaphore originatorSemaphore) {
boolean acquired;
try {
acquired = originatorSemaphore.tryAcquire(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
acquired = false;
log.debug("[{}] Failed to acquire semaphore", originator, e);
}
return acquired;
}
private ListenableFuture<TbMsg> updateMsgAndDb(TbContext ctx, TbMsg msg, Optional<ObjectNode> msgBodyOpt, double result) {
@ -102,18 +147,47 @@ public class TbMathNode implements TbNode {
case MESSAGE_METADATA:
return Futures.immediateFuture(addToMeta(msg, mathResultDef, result));
case ATTRIBUTE:
ListenableFuture<Void> attrSave = ctx.getTelemetryService().saveAttrAndNotify(
ctx.getTenantId(), msg.getOriginator(), getAttributeScope(mathResultDef.getAttributeScope()), mathResultDef.getValue(), result);
ListenableFuture<Void> attrSave = saveAttribute(ctx, msg, result, mathResultDef);
return Futures.transform(attrSave, attr -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor());
case TIME_SERIES:
ListenableFuture<Void> tsSave = ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(),
new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(mathResultDef.getValue(), result)));
ListenableFuture<Void> tsSave = saveTimeSeries(ctx, msg, result, mathResultDef);
return Futures.transform(tsSave, ts -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor());
default:
throw new RuntimeException("Result type is not supported: " + mathResultDef.getType() + "!");
}
}
private ListenableFuture<Void> saveTimeSeries(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
return ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(),
new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(mathResultDef.getKey(), result)));
}
private ListenableFuture<Void> saveAttribute(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
String attributeScope = getAttributeScope(mathResultDef.getAttributeScope());
if (isIntegerResult(mathResultDef, config.getOperation())) {
var value = toIntValue(mathResultDef, result);
return ctx.getTelemetryService().saveAttrAndNotify(
ctx.getTenantId(), msg.getOriginator(), attributeScope, mathResultDef.getKey(), value);
} else {
var value = toDoubleValue(mathResultDef, result);
return ctx.getTelemetryService().saveAttrAndNotify(
ctx.getTenantId(), msg.getOriginator(), attributeScope, mathResultDef.getKey(), value);
}
}
private boolean isIntegerResult(TbMathResult mathResultDef, TbRuleNodeMathFunctionType function) {
return function.isIntegerResult() || mathResultDef.getResultValuePrecision() == 0;
}
private long toIntValue(TbMathResult mathResultDef, double value) {
return (long) value;
}
private double toDoubleValue(TbMathResult mathResultDef, double value) {
return BigDecimal.valueOf(value).setScale(mathResultDef.getResultValuePrecision(), RoundingMode.HALF_UP).doubleValue();
}
private Optional<ObjectNode> convertMsgBodyIfRequired(TbMsg msg) {
Optional<ObjectNode> msgBodyOpt;
if (msgBodyToJsonConversionRequired) {
@ -142,13 +216,21 @@ public class TbMathNode implements TbNode {
private TbMsg addToBody(TbMsg msg, TbMathResult mathResultDef, Optional<ObjectNode> msgBodyOpt, double result) {
ObjectNode body = msgBodyOpt.get();
body.put(mathResultDef.getValue(), result);
if (isIntegerResult(mathResultDef, config.getOperation())) {
body.put(mathResultDef.getKey(), toIntValue(mathResultDef, result));
} else {
body.put(mathResultDef.getKey(), toDoubleValue(mathResultDef, result));
}
return TbMsg.transformMsgData(msg, JacksonUtil.toString(body));
}
private TbMsg addToMeta(TbMsg msg, TbMathResult mathResultDef, double result) {
var md = msg.getMetaData();
md.putValue(mathResultDef.getValue(), Double.toString(result));
if (isIntegerResult(mathResultDef, config.getOperation())) {
md.putValue(mathResultDef.getKey(), Long.toString(toIntValue(mathResultDef, result)));
} else {
md.putValue(mathResultDef.getKey(), Double.toString(toDoubleValue(mathResultDef, result)));
}
return TbMsg.transformMsg(msg, md);
}
@ -164,12 +246,64 @@ public class TbMathNode implements TbNode {
return apply(args.get(0), args.get(1), (a, b) -> a / b);
case SIN:
return apply(args.get(0), Math::sin);
case SINH:
return apply(args.get(0), Math::sinh);
case COS:
return apply(args.get(0), Math::cos);
case COSH:
return apply(args.get(0), Math::cosh);
case TAN:
return apply(args.get(0), Math::tan);
case TANH:
return apply(args.get(0), Math::tanh);
case ACOS:
return apply(args.get(0), Math::acos);
case ASIN:
return apply(args.get(0), Math::asin);
case ATAN:
return apply(args.get(0), Math::atan);
case ATAN2:
return apply(args.get(0), args.get(1), Math::atan2);
case EXP:
return apply(args.get(0), Math::exp);
case EXPM1:
return apply(args.get(0), Math::expm1);
case SQRT:
return apply(args.get(0), Math::sqrt);
case CBRT:
return apply(args.get(0), Math::cbrt);
case GET_EXP:
return apply(args.get(0), (x) -> (double) Math.getExponent(x));
case HYPOT:
return apply(args.get(0), args.get(1), Math::hypot);
case LOG:
return apply(args.get(0), Math::log);
case LOG10:
return apply(args.get(0), Math::log10);
case LOG1P:
return apply(args.get(0), Math::log1p);
case CEIL:
return apply(args.get(0), Math::ceil);
case FLOOR:
return apply(args.get(0), Math::floor);
case FLOOR_DIV:
return apply(args.get(0), args.get(1), (a, b) -> (double) Math.floorDiv(a.longValue(), b.longValue()));
case FLOOR_MOD:
return apply(args.get(0), args.get(1), (a, b) -> (double) Math.floorMod(a.longValue(), b.longValue()));
case ABS:
return apply(args.get(0), Math::abs);
case MIN:
return apply(args.get(0), args.get(1), Math::min);
case MAX:
return apply(args.get(0), args.get(1), Math::max);
case POW:
return apply(args.get(0), args.get(1), Math::pow);
case SIGNUM:
return apply(args.get(0), Math::signum);
case RAD:
return apply(args.get(0), Math::toRadians);
case DEG:
return apply(args.get(0), Math::toDegrees);
default:
throw new RuntimeException("Not supported operation: " + config.getOperation());
}
@ -188,19 +322,17 @@ public class TbMathNode implements TbNode {
case CONSTANT:
return Futures.immediateFuture(TbMathArgumentValue.constant(arg));
case MESSAGE_BODY:
return Futures.immediateFuture(TbMathArgumentValue.fromMessageBody(arg.getValue(), msgBodyOpt));
return Futures.immediateFuture(TbMathArgumentValue.fromMessageBody(arg, msgBodyOpt));
case MESSAGE_METADATA:
return Futures.immediateFuture(TbMathArgumentValue.fromMessageMetadata(arg.getValue(), msg.getMetaData()));
return Futures.immediateFuture(TbMathArgumentValue.fromMessageMetadata(arg, msg.getMetaData()));
case ATTRIBUTE:
String scope = getAttributeScope(arg.getAttributeScope());
return Futures.transform(ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, arg.getValue()),
opt -> getTbMathArgumentValue(opt.orElseThrow(() ->
new RuntimeException("Attribute: " + arg.getValue() + " with scope: " + scope + " not found for entity: " + msg.getOriginator())))
, MoreExecutors.directExecutor());
return Futures.transform(ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, arg.getKey()),
opt -> getTbMathArgumentValue(arg, opt, "Attribute: " + arg.getKey() + " with scope: " + scope + " not found for entity: " + msg.getOriginator())
,MoreExecutors.directExecutor());
case TIME_SERIES:
return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), msg.getOriginator(), arg.getValue()),
opt -> getTbMathArgumentValue(opt.orElseThrow(() ->
new RuntimeException("Time-series: " + arg.getValue() + " not found for entity: " + msg.getOriginator())))
return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), msg.getOriginator(), arg.getKey()),
opt -> getTbMathArgumentValue(arg, opt, "Time-series: " + arg.getKey() + " not found for entity: " + msg.getOriginator())
, MoreExecutors.directExecutor());
default:
throw new RuntimeException("Unsupported argument type: " + arg.getType() + "!");
@ -212,14 +344,23 @@ public class TbMathNode implements TbNode {
return StringUtils.isEmpty(attrScope) ? DataConstants.SERVER_SCOPE : attrScope;
}
private TbMathArgumentValue getTbMathArgumentValue(KvEntry kv) {
switch (kv.getDataType()) {
case LONG:
return TbMathArgumentValue.fromLong(kv.getLongValue().get());
case DOUBLE:
return TbMathArgumentValue.fromDouble(kv.getDoubleValue().get());
default:
return TbMathArgumentValue.fromString(kv.getValueAsString());
private TbMathArgumentValue getTbMathArgumentValue(TbMathArgument arg, Optional<? extends KvEntry> kvOpt, String error) {
if (kvOpt != null && kvOpt.isPresent()) {
var kv = kvOpt.get();
switch (kv.getDataType()) {
case LONG:
return TbMathArgumentValue.fromLong(kv.getLongValue().get());
case DOUBLE:
return TbMathArgumentValue.fromDouble(kv.getDoubleValue().get());
default:
return TbMathArgumentValue.fromString(kv.getValueAsString());
}
} else {
if (arg.getDefaultValue() != null) {
return TbMathArgumentValue.fromDouble(arg.getDefaultValue());
} else {
throw new RuntimeException(error);
}
}
}

View File

@ -22,18 +22,18 @@ import java.util.Arrays;
import java.util.List;
@Data
public class TbMathFormulaConfiguration implements NodeConfiguration<TbMathFormulaConfiguration> {
public class TbMathNodeConfiguration implements NodeConfiguration<TbMathNodeConfiguration> {
private TbRuleNodeMathFunctionType operation;
private List<TbMathArgument> arguments;
private TbMathResult result;
@Override
public TbMathFormulaConfiguration defaultConfiguration() {
TbMathFormulaConfiguration configuration = new TbMathFormulaConfiguration();
public TbMathNodeConfiguration defaultConfiguration() {
TbMathNodeConfiguration configuration = new TbMathNodeConfiguration();
configuration.setOperation(TbRuleNodeMathFunctionType.ADD);
configuration.setArguments(Arrays.asList(new TbMathArgument(TbMathArgumentType.CONSTANT, "2"), new TbMathArgument(TbMathArgumentType.CONSTANT, "2")));
configuration.setResult(new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", false, false, null));
configuration.setResult(new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null));
return configuration;
}
}

View File

@ -25,7 +25,9 @@ import lombok.NoArgsConstructor;
public class TbMathResult {
private TbMathArgumentType type;
private String value;
private String key;
// 0 means integer, x > 0 means x decimal points after ".";
private int resultValuePrecision;
private boolean addToBody;
private boolean addToMetadata;
private String attributeScope;

View File

@ -19,19 +19,31 @@ import lombok.Getter;
public enum TbRuleNodeMathFunctionType {
ADD(2), SUB(2), MULT(2), DIV(2), SIN(1), COS(1), SQRT(1), ABS(1);
ADD(2), SUB(2), MULT(2), DIV(2),
SIN, SINH, COS, COSH, TAN, TANH, ACOS, ASIN, ATAN, ATAN2(2),
EXP, EXPM1, SQRT, CBRT, GET_EXP(1, 1, true), HYPOT(2), LOG, LOG10, LOG1P,
CEIL(1, 1, true), FLOOR(1, 1, true), FLOOR_DIV(2), FLOOR_MOD(2),
ABS, MIN(2), MAX(2), POW, SIGNUM, RAD, DEG;
@Getter
private final int minArgs;
@Getter
private final int maxArgs;
@Getter
private final boolean integerResult;
TbRuleNodeMathFunctionType() {
this(1, 1, false);
}
TbRuleNodeMathFunctionType(int args) {
this(args, args);
this(args, args, false);
}
TbRuleNodeMathFunctionType(int minArgs, int maxArgs) {
TbRuleNodeMathFunctionType(int minArgs, int maxArgs, boolean integerResult) {
this.minArgs = minArgs;
this.maxArgs = maxArgs;
this.integerResult = integerResult;
}
}

View File

@ -0,0 +1,231 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package math;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.Futures;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.math.TbMathArgument;
import org.thingsboard.rule.engine.math.TbMathArgumentType;
import org.thingsboard.rule.engine.math.TbMathNodeConfiguration;
import org.thingsboard.rule.engine.math.TbMathNode;
import org.thingsboard.rule.engine.math.TbMathResult;
import org.thingsboard.rule.engine.math.TbRuleNodeMathFunctionType;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.Arrays;
import java.util.Optional;
import static org.mockito.Mockito.lenient;
@RunWith(MockitoJUnitRunner.class)
public class TbMathNodeTest {
private EntityId originator = new DeviceId(Uuids.timeBased());
private TenantId tenantId = TenantId.fromUUID(Uuids.timeBased());
@Mock
private TbContext ctx;
@Mock
private AttributesService attributesService;
@Mock
private TimeseriesService tsService;
private AbstractListeningExecutor dbExecutor;
@Before
public void before() {
dbExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 3;
}
};
dbExecutor.init();
Mockito.reset(ctx);
Mockito.reset(attributesService);
Mockito.reset(tsService);
lenient().when(ctx.getAttributesService()).thenReturn(attributesService);
lenient().when(ctx.getTimeseriesService()).thenReturn(tsService);
lenient().when(ctx.getTenantId()).thenReturn(tenantId);
lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor);
}
@After
public void after() {
dbExecutor.destroy();
}
private TbMathNode initNode(TbRuleNodeMathFunctionType operation, TbMathResult result, TbMathArgument... arguments) {
try {
TbMathNodeConfiguration configuration = new TbMathNodeConfiguration();
configuration.setOperation(operation);
configuration.setResult(result);
configuration.setArguments(Arrays.asList(arguments));
TbMathNode node = new TbMathNode();
node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(configuration)));
return node;
} catch (TbNodeException ex) {
throw new IllegalStateException(ex);
}
}
@Test
public void test_2_plus_2_body() {
var node = initNode(TbRuleNodeMathFunctionType.ADD,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(4, resultJson.get("result").asInt());
}
@Test
public void test_2_plus_2_meta() {
var node = initNode(TbRuleNodeMathFunctionType.ADD,
new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, "result", 0, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
Assert.assertNotNull(resultMsg.getMetaData());
var result = resultMsg.getMetaData().getValue("result");
Assert.assertNotNull(result);
Assert.assertEquals("4", result);
}
@Test
public void test_2_plus_2_attr_and_ts() {
var node = initNode(TbRuleNodeMathFunctionType.ADD,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.ATTRIBUTE, "a"),
new TbMathArgument(TbMathArgumentType.TIME_SERIES, "b")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().toString());
Mockito.when(attributesService.find(tenantId, originator, DataConstants.SERVER_SCOPE, "a"))
.thenReturn(Futures.immediateFuture(Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), new DoubleDataEntry("a", 2.0)))));
Mockito.when(tsService.findLatest(tenantId, originator, "b"))
.thenReturn(Futures.immediateFuture(Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry("b", 2L)))));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(4, resultJson.get("result").asInt());
}
@Test
public void test_sqrt_5_body() {
var node = initNode(TbRuleNodeMathFunctionType.SQRT,
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 3, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(2.236, resultJson.get("result").asDouble(), 0.0);
}
@Test
public void test_sqrt_5_meta() {
var node = initNode(TbRuleNodeMathFunctionType.SQRT,
new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, "result", 3, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var result = resultMsg.getMetaData().getValue("result");
Assert.assertNotNull(result);
Assert.assertEquals("2.236", result);
}
}