diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java index ac196a8edd..23274e863d 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java @@ -150,7 +150,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService } apiUsageReportClient.ifPresent(client -> client.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1)); pushedMsgs.incrementAndGet(); - log.trace("InvokeScript uuid {} with timeout {}ms", scriptId, getMaxInvokeRequestsTimeout()); + log.trace("[{}] InvokeScript uuid {} with timeout {}ms", tenantId, scriptId, getMaxInvokeRequestsTimeout()); var task = doInvokeFunction(scriptId, args); var resultFuture = Futures.transformAsync(task.getResultFuture(), output -> { @@ -167,7 +167,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService } else { String message = "Script invocation is blocked due to maximum error count " + getMaxErrors() + ", scriptId " + scriptId + "!"; - log.warn(message); + log.warn("[{}] " + message, tenantId); return error(message); } } else { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index f7b4eb163c..095612b4e1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -84,7 +84,7 @@ import static org.thingsboard.rule.engine.math.TbMathArgumentType.CONSTANT; ) public class TbMathNode implements TbNode { - private static final ConcurrentMap> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + private static final ConcurrentMap> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); private final ThreadLocal customExpression = new ThreadLocal<>(); private TbMathNodeConfiguration config; private boolean msgBodyToJsonConversionRequired; @@ -111,21 +111,21 @@ public class TbMathNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new); - semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx)); + semaphoreWithQueue.getQueue().add(new TbMsgTbContextBiFunction(msg, ctx, this::processMsgAsync)); tryProcessQueue(semaphoreWithQueue); } - void tryProcessQueue(SemaphoreWithQueue lockAndQueue) { + void tryProcessQueue(SemaphoreWithQueue lockAndQueue) { final Semaphore semaphore = lockAndQueue.getSemaphore(); - final Queue queue = lockAndQueue.getQueue(); + final Queue queue = lockAndQueue.getQueue(); while (!queue.isEmpty()) { // The semaphore have to be acquired before EACH poll and released before NEXT poll. // Otherwise, some message will remain unprocessed in queue if (!semaphore.tryAcquire()) { return; } - TbMsgTbContext tbMsgTbContext = null; + TbMsgTbContextBiFunction tbMsgTbContext = null; try { tbMsgTbContext = queue.poll(); if (tbMsgTbContext == null) { @@ -140,7 +140,7 @@ public class TbMathNode implements TbNode { } //DO PROCESSING final TbContext ctx = tbMsgTbContext.getCtx(); - final ListenableFuture resultMsgFuture = processMsgAsync(ctx, msg); + final ListenableFuture resultMsgFuture = tbMsgTbContext.getBiFunction().apply(ctx, msg); DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { try { ctx.tellSuccess(resultMsg); @@ -156,10 +156,17 @@ public class TbMathNode implements TbNode { tryProcessQueue(lockAndQueue); } }, ctx.getDbCallbackExecutor()); - } catch (Throwable e) { + } catch (Throwable t) { semaphore.release(); - log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), tbMsgTbContext == null ? null : tbMsgTbContext.getMsg(), e); - throw e; + if (tbMsgTbContext == null) { // if no message polled, the loop become infinite, will throw exception + log.error("[{}] Failed to process TbMsgTbContext queue", lockAndQueue.getEntityId(), t); + throw t; + } + TbMsg msg = tbMsgTbContext.getMsg(); + TbContext ctx = tbMsgTbContext.getCtx(); + log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), msg, t); + ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed + continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty } break; //submitted async exact one task. next poll will try on callback } @@ -367,7 +374,7 @@ public class TbMathNode implements TbNode { return function.apply(arg1.getValue(), arg2.getValue()); } - private ListenableFuture resolveArguments(TbContext ctx, TbMsg msg, Optional msgBodyOpt, TbMathArgument arg) { + ListenableFuture resolveArguments(TbContext ctx, TbMsg msg, Optional msgBodyOpt, TbMathArgument arg) { String argKey = getKeyFromTemplate(msg, arg.getType(), arg.getKey()); switch (arg.getType()) { case CONSTANT: @@ -433,9 +440,10 @@ public class TbMathNode implements TbNode { @Data @RequiredArgsConstructor - static public class TbMsgTbContext { + static public class TbMsgTbContextBiFunction { final TbMsg msg; final TbContext ctx; + final BiFunction> biFunction; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 292292b89b..9eec38cfae 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -18,6 +18,9 @@ package org.thingsboard.rule.engine.math; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Triple; +import org.assertj.core.api.SoftAssertions; +import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +30,9 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.verification.Timeout; import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; @@ -72,6 +77,10 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; @@ -128,7 +137,15 @@ public class TbMathNodeTest { return initNode(TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments); } + private TbMathNode initNodeWithCustomFunction(TbContext ctx, String expression, TbMathResult result, TbMathArgument... arguments) { + return initNode(ctx, TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments); + } + private TbMathNode initNode(TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) { + return initNode(this.ctx, operation, expression, result, arguments); + } + + private TbMathNode initNode(TbContext ctx, TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) { try { TbMathNodeConfiguration configuration = new TbMathNodeConfiguration(); configuration.setOperation(operation); @@ -521,10 +538,11 @@ public class TbMathNodeTest { new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey") ); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 10).toString()); - Throwable thrown = assertThrows(RuntimeException.class, () -> { - node.onMsg(ctx, msg); - }); - assertNotNull(thrown.getMessage()); + node.onMsg(ctx, msg); + + ArgumentCaptor tCaptor = ArgumentCaptor.forClass(Throwable.class); + Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(eq(msg), tCaptor.capture()); + Assert.assertNotNull(tCaptor.getValue().getMessage()); } @Test @@ -534,11 +552,12 @@ public class TbMathNodeTest { new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a") ); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); - Throwable thrown = assertThrows(RuntimeException.class, () -> { - node.onMsg(ctx, msg); - }); - assertNotNull(thrown.getMessage()); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); + node.onMsg(ctx, msg); + + ArgumentCaptor tCaptor = ArgumentCaptor.forClass(Throwable.class); + Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(eq(msg), tCaptor.capture()); + Assert.assertNotNull(tCaptor.getValue().getMessage()); } @Test @@ -553,10 +572,10 @@ public class TbMathNodeTest { CountDownLatch slowProcessingLatch = new CountDownLatch(1); List slowMsgList = IntStream.range(0, 5) - .mapToObj(x -> TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) + .mapToObj(x -> TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originatorSlow, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) .collect(Collectors.toList()); List fastMsgList = IntStream.range(0, 2) - .mapToObj(x -> TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) + .mapToObj(x -> TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originatorFast, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) .collect(Collectors.toList()); assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE); @@ -609,6 +628,115 @@ public class TbMathNodeTest { verify(ctx, never()).tellFailure(any(), any()); } + @Test + public void testExp4j_concurrentBySingleOriginator_processMsgAsyncException() { + TbMathNode node = spy(initNodeWithCustomFunction("2a+3b", + new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b") + )); + + willThrow(new RuntimeException("Message body has no 'delta'")).given(node).resolveArguments(any(), any(), any(), any()); + + EntityId originatorSlow = DeviceId.fromString("7f01170d-6bba-419c-b95c-2b4c3ba32f30"); + CountDownLatch slowProcessingLatch = new CountDownLatch(1); + + List slowMsgList = IntStream.range(0, 5) + .mapToObj(x -> TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originatorSlow, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) + .collect(Collectors.toList()); + + assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE); + + log.debug("rule-dispatcher [{}], db-callback [{}], slowMsg [{}]", RULE_DISPATCHER_POOL_SIZE, DB_CALLBACK_POOL_SIZE, slowMsgList.size()); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + if (slowProcessingLatch.getCount() > 0) { + log.debug("Await on slowProcessingLatch before processMsgAsync"); + try { + assertThat(slowProcessingLatch.await(30, TimeUnit.SECONDS)).as("await on slowProcessingLatch").isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + log.debug("\uD83D\uDC0C processMsgAsync with exception [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(eq(ctx), argThat(slowMsgList::contains)); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("submit slow originator onMsg [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).onMsg(eq(ctx), argThat(slowMsgList::contains)); + + // submit slow msg may block all rule engine dispatcher threads + slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + // wait until dispatcher threads started with all slowMsg + verify(node, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).onMsg(eq(ctx), argThat(slowMsgList::contains)); + + slowProcessingLatch.countDown(); + + verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).tellFailure(any(), any()); + verify(ctx, never()).tellSuccess(any()); + + } + + @Test + public void testExp4j_concurrentBySingleOriginator_SingleMsg_manyNodesWithDifferentOutput() { + assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1); + CountDownLatch processingLatch = new CountDownLatch(1); + List> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2) + .mapToObj(x -> { + final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts + willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor(); + final String resultKey = "result" + x; + final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b", + new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, resultKey, 1, false, true, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b"))); + willAnswer(invocation -> { + if (processingLatch.getCount() > 0) { + log.debug("Await on processingLatch before processMsgAsync"); + try { + assertThat(processingLatch.await(30, TimeUnit.SECONDS)).as("await on processingLatch").isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + log.debug("\uD83D\uDC0C processMsgAsync on node with expected resultKey [{}]", resultKey); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(any(), any()); + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("submit originator onMsg [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).onMsg(any(), any()); + return Triple.of(ctx, resultKey, node); + }) + .collect(Collectors.toList()); + ctxNodes.forEach(ctxNode -> ruleEngineDispatcherExecutor.executeAsync(() -> ctxNode.getRight() + .onMsg(ctxNode.getLeft(), TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, "{\"a\":2,\"b\":2}")))); + ctxNodes.forEach(ctxNode -> verify(ctxNode.getRight(), timeout(5000)).onMsg(eq(ctxNode.getLeft()), any())); + processingLatch.countDown(); + + SoftAssertions softly = new SoftAssertions(); + ctxNodes.forEach(ctxNode -> { + final TbContext ctx = ctxNode.getLeft(); + final String resultKey = ctxNode.getMiddle(); + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + verify(ctx, timeout(5000)).tellSuccess(msgCaptor.capture()); + + TbMsg resultMsg = msgCaptor.getValue(); + assertThat(resultMsg).as("result msg non null for result key " + resultKey).isNotNull(); + log.debug("asserting result key [{}] in metadata [{}]", resultKey, resultMsg.getMetaData().getData()); + softly.assertThat(resultMsg.getMetaData().getValue(resultKey)).as("asserting result key " + resultKey) + .isEqualTo("10.0"); + }); + + softly.assertAll(); + verify(ctx, never()).tellFailure(any(), any()); + } + static class RuleDispatcherExecutor extends AbstractListeningExecutor { @Override protected int getThreadPollSize() {