Merge pull request #8725 from ShvaykaD/feature/math-node-fields-templatization

Math node fields templatization
This commit is contained in:
Andrew Shvayka 2023-06-12 18:02:59 +03:00 committed by GitHub
commit 80baf9f54d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 55 deletions

View File

@ -43,19 +43,18 @@ public class TbMathArgumentValue {
throw new RuntimeException(error);
}
public static TbMathArgumentValue fromMessageBody(TbMathArgument arg, Optional<ObjectNode> jsonNodeOpt) {
String key = arg.getKey();
public static TbMathArgumentValue fromMessageBody(TbMathArgument arg, String argKey, Optional<ObjectNode> jsonNodeOpt) {
Double defaultValue = arg.getDefaultValue();
if (jsonNodeOpt.isEmpty()) {
return defaultOrThrow(defaultValue, "Message body is empty!");
}
var json = jsonNodeOpt.get();
if (!json.has(key)) {
return defaultOrThrow(defaultValue, "Message body has no '" + key + "'!");
if (!json.has(argKey)) {
return defaultOrThrow(defaultValue, "Message body has no '" + argKey + "'!");
}
JsonNode valueNode = json.get(key);
JsonNode valueNode = json.get(argKey);
if (valueNode.isNull()) {
return defaultOrThrow(defaultValue, "Message body has null '" + key + "'!");
return defaultOrThrow(defaultValue, "Message body has null '" + argKey + "'!");
}
double value;
if (valueNode.isNumber()) {
@ -69,7 +68,7 @@ public class TbMathArgumentValue {
throw new RuntimeException("Can't convert value '" + valueNode.asText() + "' to double!");
}
} else {
return defaultOrThrow(defaultValue, "Message value is empty for '" + key + "'!");
return defaultOrThrow(defaultValue, "Message value is empty for '" + argKey + "'!");
}
} else {
throw new RuntimeException("Can't convert value '" + valueNode.toString() + "' to double!");
@ -77,15 +76,14 @@ public class TbMathArgumentValue {
return new TbMathArgumentValue(value);
}
public static TbMathArgumentValue fromMessageMetadata(TbMathArgument arg, TbMsgMetaData metaData) {
String key = arg.getKey();
public static TbMathArgumentValue fromMessageMetadata(TbMathArgument arg, String argKey, TbMsgMetaData metaData) {
Double defaultValue = arg.getDefaultValue();
if (metaData == null) {
return defaultOrThrow(defaultValue, "Message metadata is empty!");
}
var value = metaData.getValue(key);
var value = metaData.getValue(argKey);
if (StringUtils.isEmpty(value)) {
return defaultOrThrow(defaultValue, "Message metadata has no '" + key + "'!");
return defaultOrThrow(defaultValue, "Message metadata has no '" + argKey + "'!");
}
return fromString(value);
}

View File

@ -51,6 +51,8 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.thingsboard.rule.engine.math.TbMathArgumentType.CONSTANT;
@SuppressWarnings("UnstableApiUsage")
@Slf4j
@RuleNode(
@ -121,7 +123,7 @@ public class TbMathNode implements TbNode {
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(ctx, msg, args)), ctx.getDbCallbackExecutor());
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor());
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
@ -155,17 +157,18 @@ public class TbMathNode implements TbNode {
private ListenableFuture<TbMsg> updateMsgAndDb(TbContext ctx, TbMsg msg, Optional<ObjectNode> msgBodyOpt, double result) {
TbMathResult mathResultDef = config.getResult();
String mathResultKey = getKeyFromTemplate(msg, mathResultDef.getType(), mathResultDef.getKey());
switch (mathResultDef.getType()) {
case MESSAGE_BODY:
return Futures.immediateFuture(addToBody(msg, mathResultDef, msgBodyOpt, result));
return Futures.immediateFuture(addToBody(msg, mathResultDef, mathResultKey, msgBodyOpt, result));
case MESSAGE_METADATA:
return Futures.immediateFuture(addToMeta(msg, mathResultDef, result));
return Futures.immediateFuture(addToMeta(msg, mathResultDef, mathResultKey, result));
case ATTRIBUTE:
ListenableFuture<Void> attrSave = saveAttribute(ctx, msg, result, mathResultDef);
return Futures.transform(attrSave, attr -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor());
return Futures.transform(attrSave, attr -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef, mathResultKey), ctx.getDbCallbackExecutor());
case TIME_SERIES:
ListenableFuture<Void> tsSave = saveTimeSeries(ctx, msg, result, mathResultDef);
return Futures.transform(tsSave, ts -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor());
return Futures.transform(tsSave, ts -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef, mathResultKey), ctx.getDbCallbackExecutor());
default:
throw new RuntimeException("Result type is not supported: " + mathResultDef.getType() + "!");
}
@ -180,7 +183,7 @@ public class TbMathNode implements TbNode {
private ListenableFuture<Void> saveAttribute(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
String attributeScope = getAttributeScope(mathResultDef.getAttributeScope());
if (isIntegerResult(mathResultDef, config.getOperation())) {
var value = toIntValue(mathResultDef, result);
var value = toIntValue(result);
return ctx.getTelemetryService().saveAttrAndNotify(
ctx.getTenantId(), msg.getOriginator(), attributeScope, mathResultDef.getKey(), value);
} else {
@ -194,7 +197,7 @@ public class TbMathNode implements TbNode {
return function.isIntegerResult() || mathResultDef.getResultValuePrecision() == 0;
}
private long toIntValue(TbMathResult mathResultDef, double value) {
private long toIntValue(double value) {
return (long) value;
}
@ -217,38 +220,38 @@ public class TbMathNode implements TbNode {
return msgBodyOpt;
}
private TbMsg addToBodyAndMeta(TbMsg msg, Optional<ObjectNode> msgBodyOpt, double result, TbMathResult mathResultDef) {
private TbMsg addToBodyAndMeta(TbMsg msg, Optional<ObjectNode> msgBodyOpt, double result, TbMathResult mathResultDef, String mathResultKey) {
TbMsg tmpMsg = msg;
if (mathResultDef.isAddToBody()) {
tmpMsg = addToBody(tmpMsg, mathResultDef, msgBodyOpt, result);
tmpMsg = addToBody(tmpMsg, mathResultDef, mathResultKey, msgBodyOpt, result);
}
if (mathResultDef.isAddToMetadata()) {
tmpMsg = addToMeta(tmpMsg, mathResultDef, result);
tmpMsg = addToMeta(tmpMsg, mathResultDef, mathResultKey, result);
}
return tmpMsg;
}
private TbMsg addToBody(TbMsg msg, TbMathResult mathResultDef, Optional<ObjectNode> msgBodyOpt, double result) {
private TbMsg addToBody(TbMsg msg, TbMathResult mathResultDef, String mathResultKey, Optional<ObjectNode> msgBodyOpt, double result) {
ObjectNode body = msgBodyOpt.get();
if (isIntegerResult(mathResultDef, config.getOperation())) {
body.put(mathResultDef.getKey(), toIntValue(mathResultDef, result));
body.put(mathResultKey, toIntValue(result));
} else {
body.put(mathResultDef.getKey(), toDoubleValue(mathResultDef, result));
body.put(mathResultKey, toDoubleValue(mathResultDef, result));
}
return TbMsg.transformMsgData(msg, JacksonUtil.toString(body));
}
private TbMsg addToMeta(TbMsg msg, TbMathResult mathResultDef, double result) {
private TbMsg addToMeta(TbMsg msg, TbMathResult mathResultDef, String mathResultKey, double result) {
var md = msg.getMetaData();
if (isIntegerResult(mathResultDef, config.getOperation())) {
md.putValue(mathResultDef.getKey(), Long.toString(toIntValue(mathResultDef, result)));
md.putValue(mathResultKey, Long.toString(toIntValue(result)));
} else {
md.putValue(mathResultDef.getKey(), Double.toString(toDoubleValue(mathResultDef, result)));
md.putValue(mathResultKey, Double.toString(toDoubleValue(mathResultDef, result)));
}
return TbMsg.transformMsg(msg, md);
}
private double calculateResult(TbContext ctx, TbMsg msg, List<TbMathArgumentValue> args) {
private double calculateResult(List<TbMathArgumentValue> args) {
switch (config.getOperation()) {
case ADD:
return apply(args.get(0), args.get(1), Double::sum);
@ -345,21 +348,22 @@ public class TbMathNode implements TbNode {
}
private ListenableFuture<TbMathArgumentValue> resolveArguments(TbContext ctx, TbMsg msg, Optional<ObjectNode> msgBodyOpt, TbMathArgument arg) {
String argKey = getKeyFromTemplate(msg, arg.getType(), arg.getKey());
switch (arg.getType()) {
case CONSTANT:
return Futures.immediateFuture(TbMathArgumentValue.constant(arg));
case MESSAGE_BODY:
return Futures.immediateFuture(TbMathArgumentValue.fromMessageBody(arg, msgBodyOpt));
return Futures.immediateFuture(TbMathArgumentValue.fromMessageBody(arg, argKey, msgBodyOpt));
case MESSAGE_METADATA:
return Futures.immediateFuture(TbMathArgumentValue.fromMessageMetadata(arg, msg.getMetaData()));
return Futures.immediateFuture(TbMathArgumentValue.fromMessageMetadata(arg, argKey, msg.getMetaData()));
case ATTRIBUTE:
String scope = getAttributeScope(arg.getAttributeScope());
return Futures.transform(ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, arg.getKey()),
opt -> getTbMathArgumentValue(arg, opt, "Attribute: " + arg.getKey() + " with scope: " + scope + " not found for entity: " + msg.getOriginator())
return Futures.transform(ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, argKey),
opt -> getTbMathArgumentValue(arg, opt, "Attribute: " + argKey + " with scope: " + scope + " not found for entity: " + msg.getOriginator())
, MoreExecutors.directExecutor());
case TIME_SERIES:
return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), msg.getOriginator(), arg.getKey()),
opt -> getTbMathArgumentValue(arg, opt, "Time-series: " + arg.getKey() + " not found for entity: " + msg.getOriginator())
return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), msg.getOriginator(), argKey),
opt -> getTbMathArgumentValue(arg, opt, "Time-series: " + argKey + " not found for entity: " + msg.getOriginator())
, MoreExecutors.directExecutor());
default:
throw new RuntimeException("Unsupported argument type: " + arg.getType() + "!");
@ -367,6 +371,10 @@ public class TbMathNode implements TbNode {
}
private String getKeyFromTemplate(TbMsg msg, TbMathArgumentType type, String keyPattern) {
return CONSTANT.equals(type) ? keyPattern : TbNodeUtils.processPattern(keyPattern, msg);
}
private String getAttributeScope(String attrScope) {
return StringUtils.isEmpty(attrScope) ? DataConstants.SERVER_SCOPE : attrScope;
}

View File

@ -31,7 +31,7 @@ public class TbMathArgumentValueTest {
public void test_fromMessageBody_then_defaultValue() {
TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey");
tbMathArgument.setDefaultValue(5.0);
TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.ofNullable(JacksonUtil.newObjectNode()));
TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.ofNullable(JacksonUtil.newObjectNode()));
Assert.assertEquals(5.0, result.getValue(), 0d);
}
@ -39,7 +39,7 @@ public class TbMathArgumentValueTest {
public void test_fromMessageBody_then_emptyBody() {
TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey");
Throwable thrown = assertThrows(RuntimeException.class, () -> {
TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.empty());
TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.empty());
});
Assert.assertNotNull(thrown.getMessage());
}
@ -47,7 +47,7 @@ public class TbMathArgumentValueTest {
@Test
public void test_fromMessageBody_then_noKey() {
TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey");
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.ofNullable(JacksonUtil.newObjectNode())));
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.ofNullable(JacksonUtil.newObjectNode())));
Assert.assertNotNull(thrown.getMessage());
}
@ -58,12 +58,12 @@ public class TbMathArgumentValueTest {
msgData.putNull("TestKey");
//null value
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData)));
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData)));
Assert.assertNotNull(thrown.getMessage());
//empty value
msgData.put("TestKey", "");
thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData)));
thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData)));
Assert.assertNotNull(thrown.getMessage());
}
@ -74,26 +74,26 @@ public class TbMathArgumentValueTest {
msgData.put("TestKey", "Test");
//string value
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData)));
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData)));
Assert.assertNotNull(thrown.getMessage());
//object value
msgData.set("TestKey", JacksonUtil.newObjectNode());
thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData)));
thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData)));
Assert.assertNotNull(thrown.getMessage());
}
@Test
public void test_fromMessageMetadata_then_noKey() {
TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey");
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, new TbMsgMetaData()));
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, tbMathArgument.getKey(), new TbMsgMetaData()));
Assert.assertNotNull(thrown.getMessage());
}
@Test
public void test_fromMessageMetadata_then_valueEmpty() {
TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey");
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, null));
Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, tbMathArgument.getKey(), null));
Assert.assertNotNull(thrown.getMessage());
}

View File

@ -16,7 +16,10 @@
package org.thingsboard.rule.engine.math;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -26,6 +29,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
@ -47,7 +51,11 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
@ -57,6 +65,7 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class TbMathNodeTest {
@ -130,24 +139,52 @@ public class TbMathNodeTest {
@Test
public void testExp4j() {
var node = initNodeWithCustomFunction("2a+3b",
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "${key1}", 2, false, false, null),
new TbMathArgument("a", TbMathArgumentType.MESSAGE_BODY, "${key2}"),
new TbMathArgument("b", TbMathArgumentType.MESSAGE_BODY, "$[key3]")
);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key1", "firstMsgResult");
metaData.putValue("key2", "argumentA");
ObjectNode msgNode = JacksonUtil.newObjectNode()
.put("key3", "argumentB").put("argumentA", 2).put("argumentB", 2);
TbMsg msg = TbMsg.newMsg("TEST", originator, metaData, msgNode.toString());
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
ConcurrentMap<EntityId, Semaphore> semaphores = (ConcurrentMap<EntityId, Semaphore>) ReflectionTestUtils.getField(node, "semaphores");
Assert.assertNotNull(semaphores);
Semaphore originatorSemaphore = semaphores.get(originator);
Assert.assertNotNull(originatorSemaphore);
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getData());
var resultJson = JacksonUtil.toJsonNode(resultMsg.getData());
Assert.assertTrue(resultJson.has("result"));
Assert.assertEquals(10, resultJson.get("result").asInt());
metaData.putValue("key1", "secondMsgResult");
metaData.putValue("key2", "argumentC");
msgNode = JacksonUtil.newObjectNode()
.put("key3", "argumentD").put("argumentC", 4).put("argumentD", 3);
msg = TbMsg.newMsg("TEST", originator, metaData, msgNode.toString());
node.onMsg(ctx, msg);
Awaitility.await("Semaphore released").atMost(5, TimeUnit.SECONDS).until(semaphores.get(originator)::tryAcquire);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.times(2)).tellSuccess(msgCaptor.capture());
List<TbMsg> resultMsgs = msgCaptor.getAllValues();
Assert.assertFalse(resultMsgs.isEmpty());
Assert.assertEquals(2, resultMsgs.size());
for (int i = 0; i < resultMsgs.size(); i++) {
TbMsg outMsg = resultMsgs.get(i);
Assert.assertNotNull(outMsg);
Assert.assertNotNull(outMsg.getData());
var resultJson = JacksonUtil.toJsonNode(outMsg.getData());
String resultKey = i == 0 ? "firstMsgResult" : "secondMsgResult";
Assert.assertTrue(resultJson.has(resultKey));
Assert.assertEquals(i == 0 ? 10 : 17, resultJson.get(resultKey).asInt());
}
semaphores.remove(originator);
}
@Test