From c28e4157e79dbf3c6e07b49351c6dbef10c2fde5 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 31 Aug 2023 00:07:35 +0200 Subject: [PATCH] TbMathNode: call processMsgAsync via BiFunction this::processMsgAsync --- .../thingsboard/rule/engine/math/TbMathNode.java | 15 ++++++++------- .../rule/engine/math/TbMathNodeTest.java | 1 - 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index a2b809037b..d30ded0f23 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -82,7 +82,7 @@ import java.util.stream.Collectors; ) public class TbMathNode implements TbNode { - private static final ConcurrentMap> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + private static final ConcurrentMap> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); private final ThreadLocal customExpression = new ThreadLocal<>(); private TbMathNodeConfiguration config; private boolean msgBodyToJsonConversionRequired; @@ -109,21 +109,21 @@ public class TbMathNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new); - semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx)); + semaphoreWithQueue.getQueue().add(new TbMsgTbContextBiFunction(msg, ctx, this::processMsgAsync)); tryProcessQueue(semaphoreWithQueue); } - void tryProcessQueue(SemaphoreWithQueue lockAndQueue) { + void tryProcessQueue(SemaphoreWithQueue lockAndQueue) { final Semaphore semaphore = lockAndQueue.getSemaphore(); - final Queue queue = lockAndQueue.getQueue(); + final Queue queue = lockAndQueue.getQueue(); while (!queue.isEmpty()) { // The semaphore have to be acquired before EACH poll and released before NEXT poll. // Otherwise, some message will remain unprocessed in queue if (!semaphore.tryAcquire()) { return; } - TbMsgTbContext tbMsgTbContext = null; + TbMsgTbContextBiFunction tbMsgTbContext = null; try { tbMsgTbContext = queue.poll(); if (tbMsgTbContext == null) { @@ -138,7 +138,7 @@ public class TbMathNode implements TbNode { } //DO PROCESSING final TbContext ctx = tbMsgTbContext.getCtx(); - final ListenableFuture resultMsgFuture = processMsgAsync(ctx, msg); + final ListenableFuture resultMsgFuture = tbMsgTbContext.getBiFunction().apply(ctx, msg); DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { try { ctx.tellSuccess(resultMsg); @@ -432,9 +432,10 @@ public class TbMathNode implements TbNode { @Data @RequiredArgsConstructor - static public class TbMsgTbContext { + static public class TbMsgTbContextBiFunction { final TbMsg msg; final TbContext ctx; + final BiFunction> biFunction; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index c9659bad9f..bf85d6c96e 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -652,7 +652,6 @@ public class TbMathNodeTest { List> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2) .mapToObj(x -> { final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts - willReturn(tenantId).given(ctx).getTenantId(); willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor(); final String resultKey = "result" + x; final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b",