TbMathNode: call processMsgAsync via BiFunction this::processMsgAsync

This commit is contained in:
Sergey Matvienko 2023-08-31 00:07:35 +02:00
parent 168d0980f0
commit c28e4157e7
2 changed files with 8 additions and 8 deletions

View File

@ -82,7 +82,7 @@ import java.util.stream.Collectors;
) )
public class TbMathNode implements TbNode { public class TbMathNode implements TbNode {
private static final ConcurrentMap<EntityId, SemaphoreWithQueue<TbMsgTbContext>> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); private static final ConcurrentMap<EntityId, SemaphoreWithQueue<TbMsgTbContextBiFunction>> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
private final ThreadLocal<Expression> customExpression = new ThreadLocal<>(); private final ThreadLocal<Expression> customExpression = new ThreadLocal<>();
private TbMathNodeConfiguration config; private TbMathNodeConfiguration config;
private boolean msgBodyToJsonConversionRequired; private boolean msgBodyToJsonConversionRequired;
@ -109,21 +109,21 @@ public class TbMathNode implements TbNode {
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new); var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new);
semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx)); semaphoreWithQueue.getQueue().add(new TbMsgTbContextBiFunction(msg, ctx, this::processMsgAsync));
tryProcessQueue(semaphoreWithQueue); tryProcessQueue(semaphoreWithQueue);
} }
void tryProcessQueue(SemaphoreWithQueue<TbMsgTbContext> lockAndQueue) { void tryProcessQueue(SemaphoreWithQueue<TbMsgTbContextBiFunction> lockAndQueue) {
final Semaphore semaphore = lockAndQueue.getSemaphore(); final Semaphore semaphore = lockAndQueue.getSemaphore();
final Queue<TbMsgTbContext> queue = lockAndQueue.getQueue(); final Queue<TbMsgTbContextBiFunction> queue = lockAndQueue.getQueue();
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
// The semaphore have to be acquired before EACH poll and released before NEXT poll. // The semaphore have to be acquired before EACH poll and released before NEXT poll.
// Otherwise, some message will remain unprocessed in queue // Otherwise, some message will remain unprocessed in queue
if (!semaphore.tryAcquire()) { if (!semaphore.tryAcquire()) {
return; return;
} }
TbMsgTbContext tbMsgTbContext = null; TbMsgTbContextBiFunction tbMsgTbContext = null;
try { try {
tbMsgTbContext = queue.poll(); tbMsgTbContext = queue.poll();
if (tbMsgTbContext == null) { if (tbMsgTbContext == null) {
@ -138,7 +138,7 @@ public class TbMathNode implements TbNode {
} }
//DO PROCESSING //DO PROCESSING
final TbContext ctx = tbMsgTbContext.getCtx(); final TbContext ctx = tbMsgTbContext.getCtx();
final ListenableFuture<TbMsg> resultMsgFuture = processMsgAsync(ctx, msg); final ListenableFuture<TbMsg> resultMsgFuture = tbMsgTbContext.getBiFunction().apply(ctx, msg);
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try { try {
ctx.tellSuccess(resultMsg); ctx.tellSuccess(resultMsg);
@ -432,9 +432,10 @@ public class TbMathNode implements TbNode {
@Data @Data
@RequiredArgsConstructor @RequiredArgsConstructor
static public class TbMsgTbContext { static public class TbMsgTbContextBiFunction {
final TbMsg msg; final TbMsg msg;
final TbContext ctx; final TbContext ctx;
final BiFunction<TbContext, TbMsg, ListenableFuture<TbMsg>> biFunction;
} }
} }

View File

@ -652,7 +652,6 @@ public class TbMathNodeTest {
List<Triple<TbContext, String, TbMathNode>> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2) List<Triple<TbContext, String, TbMathNode>> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2)
.mapToObj(x -> { .mapToObj(x -> {
final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts
willReturn(tenantId).given(ctx).getTenantId();
willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor(); willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor();
final String resultKey = "result" + x; final String resultKey = "result" + x;
final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b", final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b",