diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index eff3917c14..f48e723494 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -79,7 +79,7 @@ import java.util.stream.Collectors; ) public class TbMathNode implements TbNode { - private static final ConcurrentMap semaphores = new ConcurrentReferenceHashMap<>(); + private static final ConcurrentMap semaphores = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); private final ThreadLocal customExpression = new ThreadLocal<>(); private TbMathNodeConfiguration config; @@ -116,12 +116,7 @@ public class TbMathNode implements TbNode { } try { - var arguments = config.getArguments(); - Optional msgBodyOpt = convertMsgBodyIfRequired(msg); - var argumentValues = Futures.allAsList(arguments.stream() - .map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList())); - ListenableFuture resultMsgFuture = Futures.transformAsync(argumentValues, args -> - updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(ctx, msg, args)), ctx.getDbCallbackExecutor()); + ListenableFuture resultMsgFuture = processMsgAsync(ctx, msg); DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { try { ctx.tellSuccess(resultMsg); @@ -142,6 +137,16 @@ public class TbMathNode implements TbNode { } } + ListenableFuture processMsgAsync(TbContext ctx, TbMsg msg) { + var arguments = config.getArguments(); + Optional msgBodyOpt = convertMsgBodyIfRequired(msg); + var argumentValues = Futures.allAsList(arguments.stream() + .map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList())); + ListenableFuture resultMsgFuture = Futures.transformAsync(argumentValues, args -> + updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor()); + return resultMsgFuture; + } + private boolean tryAcquire(EntityId originator, Semaphore originatorSemaphore) { boolean acquired; try { @@ -248,7 +253,7 @@ public class TbMathNode implements TbNode { return TbMsg.transformMsg(msg, md); } - private double calculateResult(TbContext ctx, TbMsg msg, List args) { + private double calculateResult(List args) { switch (config.getOperation()) { case ADD: return apply(args.get(0), args.get(1), Double::sum);