diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentValue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentValue.java index 9985446dd3..f740aebf4a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentValue.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathArgumentValue.java @@ -43,19 +43,18 @@ public class TbMathArgumentValue { throw new RuntimeException(error); } - public static TbMathArgumentValue fromMessageBody(TbMathArgument arg, Optional jsonNodeOpt) { - String key = arg.getKey(); + public static TbMathArgumentValue fromMessageBody(TbMathArgument arg, String argKey, Optional jsonNodeOpt) { Double defaultValue = arg.getDefaultValue(); if (jsonNodeOpt.isEmpty()) { return defaultOrThrow(defaultValue, "Message body is empty!"); } var json = jsonNodeOpt.get(); - if (!json.has(key)) { - return defaultOrThrow(defaultValue, "Message body has no '" + key + "'!"); + if (!json.has(argKey)) { + return defaultOrThrow(defaultValue, "Message body has no '" + argKey + "'!"); } - JsonNode valueNode = json.get(key); + JsonNode valueNode = json.get(argKey); if (valueNode.isNull()) { - return defaultOrThrow(defaultValue, "Message body has null '" + key + "'!"); + return defaultOrThrow(defaultValue, "Message body has null '" + argKey + "'!"); } double value; if (valueNode.isNumber()) { @@ -69,7 +68,7 @@ public class TbMathArgumentValue { throw new RuntimeException("Can't convert value '" + valueNode.asText() + "' to double!"); } } else { - return defaultOrThrow(defaultValue, "Message value is empty for '" + key + "'!"); + return defaultOrThrow(defaultValue, "Message value is empty for '" + argKey + "'!"); } } else { throw new RuntimeException("Can't convert value '" + valueNode.toString() + "' to double!"); @@ -77,15 +76,14 @@ public class TbMathArgumentValue { return new TbMathArgumentValue(value); } - public static TbMathArgumentValue fromMessageMetadata(TbMathArgument arg, TbMsgMetaData metaData) { - String key = arg.getKey(); + public static TbMathArgumentValue fromMessageMetadata(TbMathArgument arg, String argKey, TbMsgMetaData metaData) { Double defaultValue = arg.getDefaultValue(); if (metaData == null) { return defaultOrThrow(defaultValue, "Message metadata is empty!"); } - var value = metaData.getValue(key); + var value = metaData.getValue(argKey); if (StringUtils.isEmpty(value)) { - return defaultOrThrow(defaultValue, "Message metadata has no '" + key + "'!"); + return defaultOrThrow(defaultValue, "Message metadata has no '" + argKey + "'!"); } return fromString(value); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index eff3917c14..33692decc2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -51,6 +51,8 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import static org.thingsboard.rule.engine.math.TbMathArgumentType.CONSTANT; + @SuppressWarnings("UnstableApiUsage") @Slf4j @RuleNode( @@ -121,7 +123,7 @@ public class TbMathNode implements TbNode { var argumentValues = Futures.allAsList(arguments.stream() .map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList())); ListenableFuture resultMsgFuture = Futures.transformAsync(argumentValues, args -> - updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(ctx, msg, args)), ctx.getDbCallbackExecutor()); + updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor()); DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { try { ctx.tellSuccess(resultMsg); @@ -155,17 +157,18 @@ public class TbMathNode implements TbNode { private ListenableFuture updateMsgAndDb(TbContext ctx, TbMsg msg, Optional msgBodyOpt, double result) { TbMathResult mathResultDef = config.getResult(); + String mathResultKey = getKeyFromTemplate(msg, mathResultDef.getType(), mathResultDef.getKey()); switch (mathResultDef.getType()) { case MESSAGE_BODY: - return Futures.immediateFuture(addToBody(msg, mathResultDef, msgBodyOpt, result)); + return Futures.immediateFuture(addToBody(msg, mathResultDef, mathResultKey, msgBodyOpt, result)); case MESSAGE_METADATA: - return Futures.immediateFuture(addToMeta(msg, mathResultDef, result)); + return Futures.immediateFuture(addToMeta(msg, mathResultDef, mathResultKey, result)); case ATTRIBUTE: ListenableFuture attrSave = saveAttribute(ctx, msg, result, mathResultDef); - return Futures.transform(attrSave, attr -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor()); + return Futures.transform(attrSave, attr -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef, mathResultKey), ctx.getDbCallbackExecutor()); case TIME_SERIES: ListenableFuture tsSave = saveTimeSeries(ctx, msg, result, mathResultDef); - return Futures.transform(tsSave, ts -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef), ctx.getDbCallbackExecutor()); + return Futures.transform(tsSave, ts -> addToBodyAndMeta(msg, msgBodyOpt, result, mathResultDef, mathResultKey), ctx.getDbCallbackExecutor()); default: throw new RuntimeException("Result type is not supported: " + mathResultDef.getType() + "!"); } @@ -180,7 +183,7 @@ public class TbMathNode implements TbNode { private ListenableFuture saveAttribute(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) { String attributeScope = getAttributeScope(mathResultDef.getAttributeScope()); if (isIntegerResult(mathResultDef, config.getOperation())) { - var value = toIntValue(mathResultDef, result); + var value = toIntValue(result); return ctx.getTelemetryService().saveAttrAndNotify( ctx.getTenantId(), msg.getOriginator(), attributeScope, mathResultDef.getKey(), value); } else { @@ -194,7 +197,7 @@ public class TbMathNode implements TbNode { return function.isIntegerResult() || mathResultDef.getResultValuePrecision() == 0; } - private long toIntValue(TbMathResult mathResultDef, double value) { + private long toIntValue(double value) { return (long) value; } @@ -217,38 +220,38 @@ public class TbMathNode implements TbNode { return msgBodyOpt; } - private TbMsg addToBodyAndMeta(TbMsg msg, Optional msgBodyOpt, double result, TbMathResult mathResultDef) { + private TbMsg addToBodyAndMeta(TbMsg msg, Optional msgBodyOpt, double result, TbMathResult mathResultDef, String mathResultKey) { TbMsg tmpMsg = msg; if (mathResultDef.isAddToBody()) { - tmpMsg = addToBody(tmpMsg, mathResultDef, msgBodyOpt, result); + tmpMsg = addToBody(tmpMsg, mathResultDef, mathResultKey, msgBodyOpt, result); } if (mathResultDef.isAddToMetadata()) { - tmpMsg = addToMeta(tmpMsg, mathResultDef, result); + tmpMsg = addToMeta(tmpMsg, mathResultDef, mathResultKey, result); } return tmpMsg; } - private TbMsg addToBody(TbMsg msg, TbMathResult mathResultDef, Optional msgBodyOpt, double result) { + private TbMsg addToBody(TbMsg msg, TbMathResult mathResultDef, String mathResultKey, Optional msgBodyOpt, double result) { ObjectNode body = msgBodyOpt.get(); if (isIntegerResult(mathResultDef, config.getOperation())) { - body.put(mathResultDef.getKey(), toIntValue(mathResultDef, result)); + body.put(mathResultKey, toIntValue(result)); } else { - body.put(mathResultDef.getKey(), toDoubleValue(mathResultDef, result)); + body.put(mathResultKey, toDoubleValue(mathResultDef, result)); } return TbMsg.transformMsgData(msg, JacksonUtil.toString(body)); } - private TbMsg addToMeta(TbMsg msg, TbMathResult mathResultDef, double result) { + private TbMsg addToMeta(TbMsg msg, TbMathResult mathResultDef, String mathResultKey, double result) { var md = msg.getMetaData(); if (isIntegerResult(mathResultDef, config.getOperation())) { - md.putValue(mathResultDef.getKey(), Long.toString(toIntValue(mathResultDef, result))); + md.putValue(mathResultKey, Long.toString(toIntValue(result))); } else { - md.putValue(mathResultDef.getKey(), Double.toString(toDoubleValue(mathResultDef, result))); + md.putValue(mathResultKey, Double.toString(toDoubleValue(mathResultDef, result))); } return TbMsg.transformMsg(msg, md); } - private double calculateResult(TbContext ctx, TbMsg msg, List args) { + private double calculateResult(List args) { switch (config.getOperation()) { case ADD: return apply(args.get(0), args.get(1), Double::sum); @@ -345,21 +348,22 @@ public class TbMathNode implements TbNode { } private ListenableFuture resolveArguments(TbContext ctx, TbMsg msg, Optional msgBodyOpt, TbMathArgument arg) { + String argKey = getKeyFromTemplate(msg, arg.getType(), arg.getKey()); switch (arg.getType()) { case CONSTANT: return Futures.immediateFuture(TbMathArgumentValue.constant(arg)); case MESSAGE_BODY: - return Futures.immediateFuture(TbMathArgumentValue.fromMessageBody(arg, msgBodyOpt)); + return Futures.immediateFuture(TbMathArgumentValue.fromMessageBody(arg, argKey, msgBodyOpt)); case MESSAGE_METADATA: - return Futures.immediateFuture(TbMathArgumentValue.fromMessageMetadata(arg, msg.getMetaData())); + return Futures.immediateFuture(TbMathArgumentValue.fromMessageMetadata(arg, argKey, msg.getMetaData())); case ATTRIBUTE: String scope = getAttributeScope(arg.getAttributeScope()); - return Futures.transform(ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, arg.getKey()), - opt -> getTbMathArgumentValue(arg, opt, "Attribute: " + arg.getKey() + " with scope: " + scope + " not found for entity: " + msg.getOriginator()) + return Futures.transform(ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, argKey), + opt -> getTbMathArgumentValue(arg, opt, "Attribute: " + argKey + " with scope: " + scope + " not found for entity: " + msg.getOriginator()) , MoreExecutors.directExecutor()); case TIME_SERIES: - return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), msg.getOriginator(), arg.getKey()), - opt -> getTbMathArgumentValue(arg, opt, "Time-series: " + arg.getKey() + " not found for entity: " + msg.getOriginator()) + return Futures.transform(ctx.getTimeseriesService().findLatest(ctx.getTenantId(), msg.getOriginator(), argKey), + opt -> getTbMathArgumentValue(arg, opt, "Time-series: " + argKey + " not found for entity: " + msg.getOriginator()) , MoreExecutors.directExecutor()); default: throw new RuntimeException("Unsupported argument type: " + arg.getType() + "!"); @@ -367,6 +371,10 @@ public class TbMathNode implements TbNode { } + private String getKeyFromTemplate(TbMsg msg, TbMathArgumentType type, String keyPattern) { + return CONSTANT.equals(type) ? keyPattern : TbNodeUtils.processPattern(keyPattern, msg); + } + private String getAttributeScope(String attrScope) { return StringUtils.isEmpty(attrScope) ? DataConstants.SERVER_SCOPE : attrScope; } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathArgumentValueTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathArgumentValueTest.java index 984d9bfb72..eccc9bb932 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathArgumentValueTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathArgumentValueTest.java @@ -31,7 +31,7 @@ public class TbMathArgumentValueTest { public void test_fromMessageBody_then_defaultValue() { TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey"); tbMathArgument.setDefaultValue(5.0); - TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.ofNullable(JacksonUtil.newObjectNode())); + TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.ofNullable(JacksonUtil.newObjectNode())); Assert.assertEquals(5.0, result.getValue(), 0d); } @@ -39,7 +39,7 @@ public class TbMathArgumentValueTest { public void test_fromMessageBody_then_emptyBody() { TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey"); Throwable thrown = assertThrows(RuntimeException.class, () -> { - TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.empty()); + TbMathArgumentValue result = TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.empty()); }); Assert.assertNotNull(thrown.getMessage()); } @@ -47,7 +47,7 @@ public class TbMathArgumentValueTest { @Test public void test_fromMessageBody_then_noKey() { TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey"); - Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.ofNullable(JacksonUtil.newObjectNode()))); + Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.ofNullable(JacksonUtil.newObjectNode()))); Assert.assertNotNull(thrown.getMessage()); } @@ -58,12 +58,12 @@ public class TbMathArgumentValueTest { msgData.putNull("TestKey"); //null value - Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData))); + Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData))); Assert.assertNotNull(thrown.getMessage()); //empty value msgData.put("TestKey", ""); - thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData))); + thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData))); Assert.assertNotNull(thrown.getMessage()); } @@ -74,26 +74,26 @@ public class TbMathArgumentValueTest { msgData.put("TestKey", "Test"); //string value - Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData))); + Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData))); Assert.assertNotNull(thrown.getMessage()); //object value msgData.set("TestKey", JacksonUtil.newObjectNode()); - thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, Optional.of(msgData))); + thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageBody(tbMathArgument, tbMathArgument.getKey(), Optional.of(msgData))); Assert.assertNotNull(thrown.getMessage()); } @Test public void test_fromMessageMetadata_then_noKey() { TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey"); - Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, new TbMsgMetaData())); + Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, tbMathArgument.getKey(), new TbMsgMetaData())); Assert.assertNotNull(thrown.getMessage()); } @Test public void test_fromMessageMetadata_then_valueEmpty() { TbMathArgument tbMathArgument = new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey"); - Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, null)); + Throwable thrown = assertThrows(RuntimeException.class, () -> TbMathArgumentValue.fromMessageMetadata(tbMathArgument, tbMathArgument.getKey(), null)); Assert.assertNotNull(thrown.getMessage()); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 69d1b46dbe..2efc438f8f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -16,7 +16,10 @@ package org.thingsboard.rule.engine.math; import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -26,6 +29,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; @@ -47,7 +51,11 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -57,6 +65,7 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +@Slf4j @RunWith(MockitoJUnitRunner.class) public class TbMathNodeTest { @@ -130,24 +139,52 @@ public class TbMathNodeTest { @Test public void testExp4j() { var node = initNodeWithCustomFunction("2a+3b", - new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null), - new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), - new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b") + new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "${key1}", 2, false, false, null), + new TbMathArgument("a", TbMathArgumentType.MESSAGE_BODY, "${key2}"), + new TbMathArgument("b", TbMathArgumentType.MESSAGE_BODY, "$[key3]") ); - TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("key1", "firstMsgResult"); + metaData.putValue("key2", "argumentA"); + ObjectNode msgNode = JacksonUtil.newObjectNode() + .put("key3", "argumentB").put("argumentA", 2).put("argumentB", 2); + TbMsg msg = TbMsg.newMsg("TEST", originator, metaData, msgNode.toString()); node.onMsg(ctx, msg); - ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); - Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture()); + ConcurrentMap semaphores = (ConcurrentMap) ReflectionTestUtils.getField(node, "semaphores"); + Assert.assertNotNull(semaphores); + Semaphore originatorSemaphore = semaphores.get(originator); + Assert.assertNotNull(originatorSemaphore); - TbMsg resultMsg = msgCaptor.getValue(); - Assert.assertNotNull(resultMsg); - Assert.assertNotNull(resultMsg.getData()); - var resultJson = JacksonUtil.toJsonNode(resultMsg.getData()); - Assert.assertTrue(resultJson.has("result")); - Assert.assertEquals(10, resultJson.get("result").asInt()); + metaData.putValue("key1", "secondMsgResult"); + metaData.putValue("key2", "argumentC"); + msgNode = JacksonUtil.newObjectNode() + .put("key3", "argumentD").put("argumentC", 4).put("argumentD", 3); + msg = TbMsg.newMsg("TEST", originator, metaData, msgNode.toString()); + + node.onMsg(ctx, msg); + + Awaitility.await("Semaphore released").atMost(5, TimeUnit.SECONDS).until(semaphores.get(originator)::tryAcquire); + + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + Mockito.verify(ctx, Mockito.times(2)).tellSuccess(msgCaptor.capture()); + + List resultMsgs = msgCaptor.getAllValues(); + Assert.assertFalse(resultMsgs.isEmpty()); + Assert.assertEquals(2, resultMsgs.size()); + + for (int i = 0; i < resultMsgs.size(); i++) { + TbMsg outMsg = resultMsgs.get(i); + Assert.assertNotNull(outMsg); + Assert.assertNotNull(outMsg.getData()); + var resultJson = JacksonUtil.toJsonNode(outMsg.getData()); + String resultKey = i == 0 ? "firstMsgResult" : "secondMsgResult"; + Assert.assertTrue(resultJson.has(resultKey)); + Assert.assertEquals(i == 0 ? 10 : 17, resultJson.get(resultKey).asInt()); + } + semaphores.remove(originator); } @Test