TbMathNode: refactored for easier testing. Semaphores - WEAK reference type. calculateResult method - removed unused args.

This commit is contained in:
Sergey Matvienko 2023-08-15 14:15:21 +02:00
parent a90653d660
commit 97ee45be24

View File

@ -79,7 +79,7 @@ import java.util.stream.Collectors;
) )
public class TbMathNode implements TbNode { public class TbMathNode implements TbNode {
private static final ConcurrentMap<EntityId, Semaphore> semaphores = new ConcurrentReferenceHashMap<>(); private static final ConcurrentMap<EntityId, Semaphore> semaphores = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
private final ThreadLocal<Expression> customExpression = new ThreadLocal<>(); private final ThreadLocal<Expression> customExpression = new ThreadLocal<>();
private TbMathNodeConfiguration config; private TbMathNodeConfiguration config;
@ -116,12 +116,7 @@ public class TbMathNode implements TbNode {
} }
try { try {
var arguments = config.getArguments(); ListenableFuture<TbMsg> resultMsgFuture = processMsgAsync(ctx, msg);
Optional<ObjectNode> msgBodyOpt = convertMsgBodyIfRequired(msg);
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(ctx, msg, args)), ctx.getDbCallbackExecutor());
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try { try {
ctx.tellSuccess(resultMsg); ctx.tellSuccess(resultMsg);
@ -142,6 +137,16 @@ public class TbMathNode implements TbNode {
} }
} }
ListenableFuture<TbMsg> processMsgAsync(TbContext ctx, TbMsg msg) {
var arguments = config.getArguments();
Optional<ObjectNode> msgBodyOpt = convertMsgBodyIfRequired(msg);
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor());
return resultMsgFuture;
}
private boolean tryAcquire(EntityId originator, Semaphore originatorSemaphore) { private boolean tryAcquire(EntityId originator, Semaphore originatorSemaphore) {
boolean acquired; boolean acquired;
try { try {
@ -248,7 +253,7 @@ public class TbMathNode implements TbNode {
return TbMsg.transformMsg(msg, md); return TbMsg.transformMsg(msg, md);
} }
private double calculateResult(TbContext ctx, TbMsg msg, List<TbMathArgumentValue> args) { private double calculateResult(List<TbMathArgumentValue> args) {
switch (config.getOperation()) { switch (config.getOperation()) {
case ADD: case ADD:
return apply(args.get(0), args.get(1), Double::sum); return apply(args.get(0), args.get(1), Double::sum);