TbMathNode: test added concurrentBySingleOriginator_SingleMsg_manyNodesWithDifferentOutput

This commit is contained in:
Sergey Matvienko 2023-08-30 23:24:13 +02:00
parent b82a5381e3
commit 168d0980f0

View File

@ -17,6 +17,8 @@ package org.thingsboard.rule.engine.math;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.assertj.core.api.SoftAssertions;
import org.junit.Assert; import org.junit.Assert;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -63,10 +65,13 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow; import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -122,7 +127,15 @@ public class TbMathNodeTest {
return initNode(TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments); return initNode(TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments);
} }
private TbMathNode initNodeWithCustomFunction(TbContext ctx, String expression, TbMathResult result, TbMathArgument... arguments) {
return initNode(ctx, TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments);
}
private TbMathNode initNode(TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) { private TbMathNode initNode(TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) {
return initNode(this.ctx, operation, expression, result, arguments);
}
private TbMathNode initNode(TbContext ctx, TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) {
try { try {
TbMathNodeConfiguration configuration = new TbMathNodeConfiguration(); TbMathNodeConfiguration configuration = new TbMathNodeConfiguration();
configuration.setOperation(operation); configuration.setOperation(operation);
@ -632,6 +645,65 @@ public class TbMathNodeTest {
} }
@Test
public void testExp4j_concurrentBySingleOriginator_SingleMsg_manyNodesWithDifferentOutput() {
assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1);
CountDownLatch processingLatch = new CountDownLatch(1);
List<Triple<TbContext, String, TbMathNode>> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2)
.mapToObj(x -> {
final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts
willReturn(tenantId).given(ctx).getTenantId();
willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor();
final String resultKey = "result" + x;
final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b",
new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, resultKey, 1, false, true, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")));
willAnswer(invocation -> {
if (processingLatch.getCount() > 0) {
log.debug("Await on processingLatch before processMsgAsync");
try {
assertThat(processingLatch.await(30, TimeUnit.SECONDS)).as("await on processingLatch").isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("\uD83D\uDC0C processMsgAsync on node with expected resultKey [{}]", resultKey);
return invocation.callRealMethod();
}).given(node).processMsgAsync(any(), any());
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("submit originator onMsg [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).onMsg(any(), any());
return Triple.of(ctx, resultKey, node);
})
.collect(Collectors.toList());
final TbMsg msg = TbMsg.newMsg("POST_TELEMETRY_REQUEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
ctxNodes.forEach(ctxNode -> ruleEngineDispatcherExecutor.executeAsync(() -> ctxNode.getRight().onMsg(ctxNode.getLeft(), msg)));
ctxNodes.forEach(ctxNode -> verify(ctxNode.getRight(), timeout(5000)).onMsg(eq(ctxNode.getLeft()), any()));
processingLatch.countDown();
SoftAssertions softly = new SoftAssertions();
ctxNodes.forEach(ctxNode -> {
final TbContext ctx = ctxNode.getLeft();
final String resultKey = ctxNode.getMiddle();
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
assertThat(resultMsg).as("result msg non null for result key " + resultKey).isNotNull();
log.debug("asserting result key [{}] in metadata [{}]", resultKey, resultMsg.getMetaData().getData());
softly.assertThat(resultMsg.getMetaData().getValue(resultKey)).as("asserting result key " + resultKey)
.isEqualTo("10.0");
});
softly.assertAll();
verify(ctx, never()).tellFailure(any(), any());
}
static class RuleDispatcherExecutor extends AbstractListeningExecutor { static class RuleDispatcherExecutor extends AbstractListeningExecutor {
@Override @Override
protected int getThreadPollSize() { protected int getThreadPollSize() {