diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index e1d2c5327d..c9659bad9f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -17,6 +17,8 @@ package org.thingsboard.rule.engine.math; import com.google.common.util.concurrent.Futures; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Triple; +import org.assertj.core.api.SoftAssertions; import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -63,10 +65,13 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -122,7 +127,15 @@ public class TbMathNodeTest { return initNode(TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments); } + private TbMathNode initNodeWithCustomFunction(TbContext ctx, String expression, TbMathResult result, TbMathArgument... arguments) { + return initNode(ctx, TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments); + } + private TbMathNode initNode(TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) { + return initNode(this.ctx, operation, expression, result, arguments); + } + + private TbMathNode initNode(TbContext ctx, TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) { try { TbMathNodeConfiguration configuration = new TbMathNodeConfiguration(); configuration.setOperation(operation); @@ -632,6 +645,65 @@ public class TbMathNodeTest { } + @Test + public void testExp4j_concurrentBySingleOriginator_SingleMsg_manyNodesWithDifferentOutput() { + assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1); + CountDownLatch processingLatch = new CountDownLatch(1); + List> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2) + .mapToObj(x -> { + final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts + willReturn(tenantId).given(ctx).getTenantId(); + willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor(); + final String resultKey = "result" + x; + final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b", + new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, resultKey, 1, false, true, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b"))); + willAnswer(invocation -> { + if (processingLatch.getCount() > 0) { + log.debug("Await on processingLatch before processMsgAsync"); + try { + assertThat(processingLatch.await(30, TimeUnit.SECONDS)).as("await on processingLatch").isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + log.debug("\uD83D\uDC0C processMsgAsync on node with expected resultKey [{}]", resultKey); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(any(), any()); + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("submit originator onMsg [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).onMsg(any(), any()); + return Triple.of(ctx, resultKey, node); + }) + .collect(Collectors.toList()); + + final TbMsg msg = TbMsg.newMsg("POST_TELEMETRY_REQUEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()); + + ctxNodes.forEach(ctxNode -> ruleEngineDispatcherExecutor.executeAsync(() -> ctxNode.getRight().onMsg(ctxNode.getLeft(), msg))); + ctxNodes.forEach(ctxNode -> verify(ctxNode.getRight(), timeout(5000)).onMsg(eq(ctxNode.getLeft()), any())); + processingLatch.countDown(); + + SoftAssertions softly = new SoftAssertions(); + ctxNodes.forEach(ctxNode -> { + final TbContext ctx = ctxNode.getLeft(); + final String resultKey = ctxNode.getMiddle(); + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx, timeout(5000)).tellSuccess(msgCaptor.capture()); + + TbMsg resultMsg = msgCaptor.getValue(); + assertThat(resultMsg).as("result msg non null for result key " + resultKey).isNotNull(); + log.debug("asserting result key [{}] in metadata [{}]", resultKey, resultMsg.getMetaData().getData()); + softly.assertThat(resultMsg.getMetaData().getValue(resultKey)).as("asserting result key " + resultKey) + .isEqualTo("10.0"); + }); + + softly.assertAll(); + verify(ctx, never()).tellFailure(any(), any()); + } + static class RuleDispatcherExecutor extends AbstractListeningExecutor { @Override protected int getThreadPollSize() {