TbMathNode: refactored to act in non-blocking style. All messages go through queue by originator with single semaphore and never wait on tryAcquire. Test refactored to provide more details on how slaw and fast messages being submitted and processed

This commit is contained in:
Sergey Matvienko 2023-08-15 22:40:01 +02:00
parent 16fdfc518d
commit 44ea477b7b
2 changed files with 114 additions and 48 deletions

View File

@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.objecthunter.exp4j.Expression;
import net.objecthunter.exp4j.ExpressionBuilder;
@ -44,6 +46,8 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -79,9 +83,8 @@ import java.util.stream.Collectors;
)
public class TbMathNode implements TbNode {
private static final ConcurrentMap<EntityId, Semaphore> semaphores = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
private static final ConcurrentMap<EntityId, SemaphoreWithQueue<TbMsgTbContext>> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
private final ThreadLocal<Expression> customExpression = new ThreadLocal<>();
private TbMathNodeConfiguration config;
private boolean msgBodyToJsonConversionRequired;
@ -106,34 +109,58 @@ public class TbMathNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
var originator = msg.getOriginator();
var originatorSemaphore = semaphores.computeIfAbsent(originator, tmp -> new Semaphore(1, true));
boolean acquired = tryAcquire(originator, originatorSemaphore);
var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new);
semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx));
if (!acquired) {
ctx.tellFailure(msg, new RuntimeException("Failed to process message for originator synchronously"));
return;
}
tryProcessQueue(semaphoreWithQueue);
}
try {
ListenableFuture<TbMsg> resultMsgFuture = processMsgAsync(ctx, msg);
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
} finally {
originatorSemaphore.release();
void tryProcessQueue(SemaphoreWithQueue<TbMsgTbContext> lockAndQueue) {
final Semaphore semaphore = lockAndQueue.getSemaphore();
final Queue<TbMsgTbContext> queue = lockAndQueue.getQueue();
while (!queue.isEmpty()) {
// The semaphore have to be acquired before EACH poll and released before NEXT poll.
// Otherwise, some message will remain unprocessed in queue
if (!semaphore.tryAcquire()) {
return;
}
TbMsgTbContext tbMsgTbContext = null;
try {
tbMsgTbContext = queue.poll();
if (tbMsgTbContext == null) {
semaphore.release();
continue;
}
}, t -> {
try {
ctx.tellFailure(msg, t);
} finally {
originatorSemaphore.release();
final TbMsg msg = tbMsgTbContext.getMsg();
if (!msg.getCallback().isMsgValid()) {
log.trace("[{}] Skipping non-valid message [{}]", lockAndQueue.getEntityId(), msg);
semaphore.release();
continue;
}
}, ctx.getDbCallbackExecutor());
} catch (Throwable e) {
originatorSemaphore.release();
log.warn("[{}] Failed to process message: {}", originator, msg, e);
throw e;
//DO PROCESSING
final TbContext ctx = tbMsgTbContext.getCtx();
final ListenableFuture<TbMsg> resultMsgFuture = processMsgAsync(ctx, msg);
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
} finally {
lockAndQueue.getSemaphore().release();
tryProcessQueue(lockAndQueue);
}
}, t -> {
try {
ctx.tellFailure(msg, t);
} finally {
lockAndQueue.getSemaphore().release();
tryProcessQueue(lockAndQueue);
}
}, ctx.getDbCallbackExecutor());
} catch (Throwable e) {
semaphore.release();
log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), tbMsgTbContext == null ? null : tbMsgTbContext.getMsg(), e);
throw e;
}
break; //submitted async exact one task. next poll will try on callback
}
}
@ -399,4 +426,20 @@ public class TbMathNode implements TbNode {
@Override
public void destroy() {
}
@Data
@RequiredArgsConstructor
static public class SemaphoreWithQueue<T> {
final EntityId entityId;
final Semaphore semaphore = new Semaphore(1);
final Queue<T> queue = new ConcurrentLinkedQueue<>();
}
@Data
@RequiredArgsConstructor
static public class TbMsgTbContext {
final TbMsg msg;
final TbContext ctx;
}
}

View File

@ -53,6 +53,8 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -90,19 +92,9 @@ public class TbMathNodeTest {
@BeforeEach
public void before() {
dbCallbackExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return DB_CALLBACK_POOL_SIZE;
}
};
dbCallbackExecutor = new DBCallbackExecutor();
dbCallbackExecutor.init();
ruleEngineDispatcherExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return RULE_DISPATCHER_POOL_SIZE;
}
};
ruleEngineDispatcherExecutor = new RuleDispatcherExecutor();
ruleEngineDispatcherExecutor.init();
lenient().when(ctx.getAttributesService()).thenReturn(attributesService);
@ -528,21 +520,20 @@ public class TbMathNodeTest {
EntityId originatorFast = DeviceId.fromString("c45360ff-7906-4102-a2ae-3495a86168d0");
CountDownLatch slowProcessingLatch = new CountDownLatch(1);
List<TbMsg> slowMsgList = List.of(
TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()),
TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())
);
List<TbMsg> fastMsgList = List.of(
TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()),
TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())
);
List<TbMsg> slowMsgList = IntStream.range(0, 5)
.mapToObj(x -> TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.collect(Collectors.toList());
List<TbMsg> fastMsgList = IntStream.range(0, 2)
.mapToObj(x -> TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.collect(Collectors.toList());
assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE);
log.debug("rule-dispatcher [{}], db-callback [{}], slowMsg [{}], fastMsg [{}]", RULE_DISPATCHER_POOL_SIZE, DB_CALLBACK_POOL_SIZE, slowMsgList.size(), fastMsgList.size());
willAnswer(invocation -> {
TbContext ctx = invocation.getArgument(0);
TbMsg msg = invocation.getArgument(1);
log.debug("awaiting on slowProcessingLatch [{}]", msg);
log.debug("\uD83D\uDC0C processMsgAsync slow originator [{}][{}]", msg.getOriginator(), msg);
try {
assertThat(slowProcessingLatch.await(30, TimeUnit.SECONDS)).as("await on slowProcessingLatch").isTrue();
} catch (InterruptedException e) {
@ -551,6 +542,24 @@ public class TbMathNodeTest {
return invocation.callRealMethod();
}).given(node).processMsgAsync(eq(ctx), argThat(slowMsgList::contains));
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("\u26A1\uFE0F processMsgAsync FAST originator [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).processMsgAsync(eq(ctx), argThat(fastMsgList::contains));
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("submit slow originator onMsg [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).onMsg(eq(ctx), argThat(slowMsgList::contains));
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("submit FAST originator onMsg [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).onMsg(eq(ctx), argThat(fastMsgList::contains));
// submit slow msg may block all rule engine dispatcher threads
slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg)));
// wait until dispatcher threads started with all slowMsg
@ -568,4 +577,18 @@ public class TbMathNodeTest {
verify(ctx, never()).tellFailure(any(), any());
}
static class RuleDispatcherExecutor extends AbstractListeningExecutor {
@Override
protected int getThreadPollSize() {
return RULE_DISPATCHER_POOL_SIZE;
}
}
static class DBCallbackExecutor extends AbstractListeningExecutor {
@Override
protected int getThreadPollSize() {
return DB_CALLBACK_POOL_SIZE;
}
}
}