From 923e41c0291a0574919be69dc9368c403aacdc20 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 28 Aug 2023 16:56:59 +0200 Subject: [PATCH 1/6] MathNode: tellFailure and process the next message in the queue in case we are the last consumer --- .../rule/engine/math/TbMathNode.java | 15 ++++-- .../rule/engine/math/TbMathNodeTest.java | 54 +++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index 5a0c717445..a2b809037b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -154,10 +154,17 @@ public class TbMathNode implements TbNode { tryProcessQueue(lockAndQueue); } }, ctx.getDbCallbackExecutor()); - } catch (Throwable e) { + } catch (Throwable t) { semaphore.release(); - log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), tbMsgTbContext == null ? null : tbMsgTbContext.getMsg(), e); - throw e; + if (tbMsgTbContext == null) { // if no message polled, the loop become infinite, will throw exception + log.error("[{}] Failed to process TbMsgTbContext queue", lockAndQueue.getEntityId(), t); + throw t; + } + TbMsg msg = tbMsgTbContext.getMsg(); + TbContext ctx = tbMsgTbContext.getCtx(); + log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), msg, t); + ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed + continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty } break; //submitted async exact one task. next poll will try on callback } @@ -364,7 +371,7 @@ public class TbMathNode implements TbNode { return function.apply(arg1.getValue(), arg2.getValue()); } - private ListenableFuture resolveArguments(TbContext ctx, TbMsg msg, Optional msgBodyOpt, TbMathArgument arg) { + ListenableFuture resolveArguments(TbContext ctx, TbMsg msg, Optional msgBodyOpt, TbMathArgument arg) { switch (arg.getType()) { case CONSTANT: return Futures.immediateFuture(TbMathArgumentValue.constant(arg)); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 50424a1521..4a5ec8b209 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -64,6 +64,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -577,6 +578,59 @@ public class TbMathNodeTest { verify(ctx, never()).tellFailure(any(), any()); } + @Test + public void testExp4j_concurrentBySingleOriginator_processMsgAsyncException() { + TbMathNode node = spy(initNodeWithCustomFunction("2a+3b", + new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b") + )); + + willThrow(new RuntimeException("Message body has no 'delta'")).given(node).resolveArguments(any(), any(), any(), any()); + + EntityId originatorSlow = DeviceId.fromString("7f01170d-6bba-419c-b95c-2b4c3ba32f30"); + CountDownLatch slowProcessingLatch = new CountDownLatch(1); + + List slowMsgList = IntStream.range(0, 5) + .mapToObj(x -> TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) + .collect(Collectors.toList()); + + assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE); + + log.debug("rule-dispatcher [{}], db-callback [{}], slowMsg [{}]", RULE_DISPATCHER_POOL_SIZE, DB_CALLBACK_POOL_SIZE, slowMsgList.size()); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + if (slowProcessingLatch.getCount() > 0) { + log.debug("Await on slowProcessingLatch before processMsgAsync"); + try { + assertThat(slowProcessingLatch.await(30, TimeUnit.SECONDS)).as("await on slowProcessingLatch").isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + log.debug("\uD83D\uDC0C processMsgAsync with exception [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(eq(ctx), argThat(slowMsgList::contains)); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("submit slow originator onMsg [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).onMsg(eq(ctx), argThat(slowMsgList::contains)); + + // submit slow msg may block all rule engine dispatcher threads + slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + // wait until dispatcher threads started with all slowMsg + verify(node, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).onMsg(eq(ctx), argThat(slowMsgList::contains)); + + slowProcessingLatch.countDown(); + + verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).tellFailure(any(), any()); + verify(ctx, never()).tellSuccess(any()); + + } + static class RuleDispatcherExecutor extends AbstractListeningExecutor { @Override protected int getThreadPollSize() { From b82a5381e3ad3082e960b100f4825af87d60072d Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 28 Aug 2023 18:00:42 +0200 Subject: [PATCH 2/6] MathNodeTest: assertThrown replaced with tellFailure verification --- .../rule/engine/math/TbMathNodeTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 4a5ec8b209..e1d2c5327d 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -57,7 +57,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyString; @@ -490,10 +489,11 @@ public class TbMathNodeTest { new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey") ); TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 10).toString()); - Throwable thrown = assertThrows(RuntimeException.class, () -> { - node.onMsg(ctx, msg); - }); - Assert.assertNotNull(thrown.getMessage()); + node.onMsg(ctx, msg); + + ArgumentCaptor tCaptor = ArgumentCaptor.forClass(Throwable.class); + Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(eq(msg), tCaptor.capture()); + Assert.assertNotNull(tCaptor.getValue().getMessage()); } @Test @@ -504,10 +504,11 @@ public class TbMathNodeTest { ); TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), "[]"); - Throwable thrown = assertThrows(RuntimeException.class, () -> { - node.onMsg(ctx, msg); - }); - Assert.assertNotNull(thrown.getMessage()); + node.onMsg(ctx, msg); + + ArgumentCaptor tCaptor = ArgumentCaptor.forClass(Throwable.class); + Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(eq(msg), tCaptor.capture()); + Assert.assertNotNull(tCaptor.getValue().getMessage()); } @Test From cfe79cc4458fd758210362338d7c407d4510ba6a Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 28 Aug 2023 19:12:33 +0200 Subject: [PATCH 3/6] AbstractScriptInvokeService: tenantId added to the log on script invocation blocked --- .../thingsboard/script/api/AbstractScriptInvokeService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java index ac196a8edd..23274e863d 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java @@ -150,7 +150,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService } apiUsageReportClient.ifPresent(client -> client.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1)); pushedMsgs.incrementAndGet(); - log.trace("InvokeScript uuid {} with timeout {}ms", scriptId, getMaxInvokeRequestsTimeout()); + log.trace("[{}] InvokeScript uuid {} with timeout {}ms", tenantId, scriptId, getMaxInvokeRequestsTimeout()); var task = doInvokeFunction(scriptId, args); var resultFuture = Futures.transformAsync(task.getResultFuture(), output -> { @@ -167,7 +167,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService } else { String message = "Script invocation is blocked due to maximum error count " + getMaxErrors() + ", scriptId " + scriptId + "!"; - log.warn(message); + log.warn("[{}] " + message, tenantId); return error(message); } } else { From 168d0980f0a4568a3b9f952436e535c1b22c75e6 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 30 Aug 2023 23:24:13 +0200 Subject: [PATCH 4/6] TbMathNode: test added concurrentBySingleOriginator_SingleMsg_manyNodesWithDifferentOutput --- .../rule/engine/math/TbMathNodeTest.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index e1d2c5327d..c9659bad9f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -17,6 +17,8 @@ package org.thingsboard.rule.engine.math; import com.google.common.util.concurrent.Futures; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Triple; +import org.assertj.core.api.SoftAssertions; import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -63,10 +65,13 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -122,7 +127,15 @@ public class TbMathNodeTest { return initNode(TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments); } + private TbMathNode initNodeWithCustomFunction(TbContext ctx, String expression, TbMathResult result, TbMathArgument... arguments) { + return initNode(ctx, TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments); + } + private TbMathNode initNode(TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) { + return initNode(this.ctx, operation, expression, result, arguments); + } + + private TbMathNode initNode(TbContext ctx, TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) { try { TbMathNodeConfiguration configuration = new TbMathNodeConfiguration(); configuration.setOperation(operation); @@ -632,6 +645,65 @@ public class TbMathNodeTest { } + @Test + public void testExp4j_concurrentBySingleOriginator_SingleMsg_manyNodesWithDifferentOutput() { + assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1); + CountDownLatch processingLatch = new CountDownLatch(1); + List> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2) + .mapToObj(x -> { + final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts + willReturn(tenantId).given(ctx).getTenantId(); + willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor(); + final String resultKey = "result" + x; + final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b", + new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, resultKey, 1, false, true, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b"))); + willAnswer(invocation -> { + if (processingLatch.getCount() > 0) { + log.debug("Await on processingLatch before processMsgAsync"); + try { + assertThat(processingLatch.await(30, TimeUnit.SECONDS)).as("await on processingLatch").isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + log.debug("\uD83D\uDC0C processMsgAsync on node with expected resultKey [{}]", resultKey); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(any(), any()); + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("submit originator onMsg [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).onMsg(any(), any()); + return Triple.of(ctx, resultKey, node); + }) + .collect(Collectors.toList()); + + final TbMsg msg = TbMsg.newMsg("POST_TELEMETRY_REQUEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()); + + ctxNodes.forEach(ctxNode -> ruleEngineDispatcherExecutor.executeAsync(() -> ctxNode.getRight().onMsg(ctxNode.getLeft(), msg))); + ctxNodes.forEach(ctxNode -> verify(ctxNode.getRight(), timeout(5000)).onMsg(eq(ctxNode.getLeft()), any())); + processingLatch.countDown(); + + SoftAssertions softly = new SoftAssertions(); + ctxNodes.forEach(ctxNode -> { + final TbContext ctx = ctxNode.getLeft(); + final String resultKey = ctxNode.getMiddle(); + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx, timeout(5000)).tellSuccess(msgCaptor.capture()); + + TbMsg resultMsg = msgCaptor.getValue(); + assertThat(resultMsg).as("result msg non null for result key " + resultKey).isNotNull(); + log.debug("asserting result key [{}] in metadata [{}]", resultKey, resultMsg.getMetaData().getData()); + softly.assertThat(resultMsg.getMetaData().getValue(resultKey)).as("asserting result key " + resultKey) + .isEqualTo("10.0"); + }); + + softly.assertAll(); + verify(ctx, never()).tellFailure(any(), any()); + } + static class RuleDispatcherExecutor extends AbstractListeningExecutor { @Override protected int getThreadPollSize() { From c28e4157e79dbf3c6e07b49351c6dbef10c2fde5 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 31 Aug 2023 00:07:35 +0200 Subject: [PATCH 5/6] TbMathNode: call processMsgAsync via BiFunction this::processMsgAsync --- .../thingsboard/rule/engine/math/TbMathNode.java | 15 ++++++++------- .../rule/engine/math/TbMathNodeTest.java | 1 - 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index a2b809037b..d30ded0f23 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -82,7 +82,7 @@ import java.util.stream.Collectors; ) public class TbMathNode implements TbNode { - private static final ConcurrentMap> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + private static final ConcurrentMap> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); private final ThreadLocal customExpression = new ThreadLocal<>(); private TbMathNodeConfiguration config; private boolean msgBodyToJsonConversionRequired; @@ -109,21 +109,21 @@ public class TbMathNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new); - semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx)); + semaphoreWithQueue.getQueue().add(new TbMsgTbContextBiFunction(msg, ctx, this::processMsgAsync)); tryProcessQueue(semaphoreWithQueue); } - void tryProcessQueue(SemaphoreWithQueue lockAndQueue) { + void tryProcessQueue(SemaphoreWithQueue lockAndQueue) { final Semaphore semaphore = lockAndQueue.getSemaphore(); - final Queue queue = lockAndQueue.getQueue(); + final Queue queue = lockAndQueue.getQueue(); while (!queue.isEmpty()) { // The semaphore have to be acquired before EACH poll and released before NEXT poll. // Otherwise, some message will remain unprocessed in queue if (!semaphore.tryAcquire()) { return; } - TbMsgTbContext tbMsgTbContext = null; + TbMsgTbContextBiFunction tbMsgTbContext = null; try { tbMsgTbContext = queue.poll(); if (tbMsgTbContext == null) { @@ -138,7 +138,7 @@ public class TbMathNode implements TbNode { } //DO PROCESSING final TbContext ctx = tbMsgTbContext.getCtx(); - final ListenableFuture resultMsgFuture = processMsgAsync(ctx, msg); + final ListenableFuture resultMsgFuture = tbMsgTbContext.getBiFunction().apply(ctx, msg); DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { try { ctx.tellSuccess(resultMsg); @@ -432,9 +432,10 @@ public class TbMathNode implements TbNode { @Data @RequiredArgsConstructor - static public class TbMsgTbContext { + static public class TbMsgTbContextBiFunction { final TbMsg msg; final TbContext ctx; + final BiFunction> biFunction; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index c9659bad9f..bf85d6c96e 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -652,7 +652,6 @@ public class TbMathNodeTest { List> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2) .mapToObj(x -> { final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts - willReturn(tenantId).given(ctx).getTenantId(); willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor(); final String resultKey = "result" + x; final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b", From bbef1cdf3fd3e8ca551d4bbac9fc96b6f62b212e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 31 Aug 2023 00:15:10 +0200 Subject: [PATCH 6/6] TbMathNode: test optimized --- .../org/thingsboard/rule/engine/math/TbMathNodeTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index bf85d6c96e..674c0efcb4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -678,10 +678,8 @@ public class TbMathNodeTest { return Triple.of(ctx, resultKey, node); }) .collect(Collectors.toList()); - - final TbMsg msg = TbMsg.newMsg("POST_TELEMETRY_REQUEST", originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()); - - ctxNodes.forEach(ctxNode -> ruleEngineDispatcherExecutor.executeAsync(() -> ctxNode.getRight().onMsg(ctxNode.getLeft(), msg))); + ctxNodes.forEach(ctxNode -> ruleEngineDispatcherExecutor.executeAsync(() -> ctxNode.getRight() + .onMsg(ctxNode.getLeft(), TbMsg.newMsg("POST_TELEMETRY_REQUEST", originator, new TbMsgMetaData(), "{\"a\":2,\"b\":2}")))); ctxNodes.forEach(ctxNode -> verify(ctxNode.getRight(), timeout(5000)).onMsg(eq(ctxNode.getLeft()), any())); processingLatch.countDown();