Merge with master

This commit is contained in:
Andrii Shvaika 2023-09-11 16:02:44 +03:00
commit 94dbb1a682
3 changed files with 160 additions and 24 deletions

View File

@ -150,7 +150,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
}
apiUsageReportClient.ifPresent(client -> client.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1));
pushedMsgs.incrementAndGet();
log.trace("InvokeScript uuid {} with timeout {}ms", scriptId, getMaxInvokeRequestsTimeout());
log.trace("[{}] InvokeScript uuid {} with timeout {}ms", tenantId, scriptId, getMaxInvokeRequestsTimeout());
var task = doInvokeFunction(scriptId, args);
var resultFuture = Futures.transformAsync(task.getResultFuture(), output -> {
@ -167,7 +167,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
} else {
String message = "Script invocation is blocked due to maximum error count "
+ getMaxErrors() + ", scriptId " + scriptId + "!";
log.warn(message);
log.warn("[{}] " + message, tenantId);
return error(message);
}
} else {

View File

@ -84,7 +84,7 @@ import static org.thingsboard.rule.engine.math.TbMathArgumentType.CONSTANT;
)
public class TbMathNode implements TbNode {
private static final ConcurrentMap<EntityId, SemaphoreWithQueue<TbMsgTbContext>> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
private static final ConcurrentMap<EntityId, SemaphoreWithQueue<TbMsgTbContextBiFunction>> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
private final ThreadLocal<Expression> customExpression = new ThreadLocal<>();
private TbMathNodeConfiguration config;
private boolean msgBodyToJsonConversionRequired;
@ -111,21 +111,21 @@ public class TbMathNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new);
semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx));
semaphoreWithQueue.getQueue().add(new TbMsgTbContextBiFunction(msg, ctx, this::processMsgAsync));
tryProcessQueue(semaphoreWithQueue);
}
void tryProcessQueue(SemaphoreWithQueue<TbMsgTbContext> lockAndQueue) {
void tryProcessQueue(SemaphoreWithQueue<TbMsgTbContextBiFunction> lockAndQueue) {
final Semaphore semaphore = lockAndQueue.getSemaphore();
final Queue<TbMsgTbContext> queue = lockAndQueue.getQueue();
final Queue<TbMsgTbContextBiFunction> queue = lockAndQueue.getQueue();
while (!queue.isEmpty()) {
// The semaphore have to be acquired before EACH poll and released before NEXT poll.
// Otherwise, some message will remain unprocessed in queue
if (!semaphore.tryAcquire()) {
return;
}
TbMsgTbContext tbMsgTbContext = null;
TbMsgTbContextBiFunction tbMsgTbContext = null;
try {
tbMsgTbContext = queue.poll();
if (tbMsgTbContext == null) {
@ -140,7 +140,7 @@ public class TbMathNode implements TbNode {
}
//DO PROCESSING
final TbContext ctx = tbMsgTbContext.getCtx();
final ListenableFuture<TbMsg> resultMsgFuture = processMsgAsync(ctx, msg);
final ListenableFuture<TbMsg> resultMsgFuture = tbMsgTbContext.getBiFunction().apply(ctx, msg);
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
@ -156,10 +156,17 @@ public class TbMathNode implements TbNode {
tryProcessQueue(lockAndQueue);
}
}, ctx.getDbCallbackExecutor());
} catch (Throwable e) {
} catch (Throwable t) {
semaphore.release();
log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), tbMsgTbContext == null ? null : tbMsgTbContext.getMsg(), e);
throw e;
if (tbMsgTbContext == null) { // if no message polled, the loop become infinite, will throw exception
log.error("[{}] Failed to process TbMsgTbContext queue", lockAndQueue.getEntityId(), t);
throw t;
}
TbMsg msg = tbMsgTbContext.getMsg();
TbContext ctx = tbMsgTbContext.getCtx();
log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), msg, t);
ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed
continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty
}
break; //submitted async exact one task. next poll will try on callback
}
@ -367,7 +374,7 @@ public class TbMathNode implements TbNode {
return function.apply(arg1.getValue(), arg2.getValue());
}
private ListenableFuture<TbMathArgumentValue> resolveArguments(TbContext ctx, TbMsg msg, Optional<ObjectNode> msgBodyOpt, TbMathArgument arg) {
ListenableFuture<TbMathArgumentValue> resolveArguments(TbContext ctx, TbMsg msg, Optional<ObjectNode> msgBodyOpt, TbMathArgument arg) {
String argKey = getKeyFromTemplate(msg, arg.getType(), arg.getKey());
switch (arg.getType()) {
case CONSTANT:
@ -433,9 +440,10 @@ public class TbMathNode implements TbNode {
@Data
@RequiredArgsConstructor
static public class TbMsgTbContext {
static public class TbMsgTbContextBiFunction {
final TbMsg msg;
final TbContext ctx;
final BiFunction<TbContext, TbMsg, ListenableFuture<TbMsg>> biFunction;
}
}

View File

@ -18,6 +18,9 @@ package org.thingsboard.rule.engine.math;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.assertj.core.api.SoftAssertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -27,7 +30,9 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.Timeout;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
@ -72,6 +77,10 @@ import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
@ -128,7 +137,15 @@ public class TbMathNodeTest {
return initNode(TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments);
}
private TbMathNode initNodeWithCustomFunction(TbContext ctx, String expression, TbMathResult result, TbMathArgument... arguments) {
return initNode(ctx, TbRuleNodeMathFunctionType.CUSTOM, expression, result, arguments);
}
private TbMathNode initNode(TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) {
return initNode(this.ctx, operation, expression, result, arguments);
}
private TbMathNode initNode(TbContext ctx, TbRuleNodeMathFunctionType operation, String expression, TbMathResult result, TbMathArgument... arguments) {
try {
TbMathNodeConfiguration configuration = new TbMathNodeConfiguration();
configuration.setOperation(operation);
@ -521,10 +538,11 @@ public class TbMathNodeTest {
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey")
);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 10).toString());
Throwable thrown = assertThrows(RuntimeException.class, () -> {
node.onMsg(ctx, msg);
});
assertNotNull(thrown.getMessage());
ArgumentCaptor<Throwable> tCaptor = ArgumentCaptor.forClass(Throwable.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(eq(msg), tCaptor.capture());
Assert.assertNotNull(tCaptor.getValue().getMessage());
}
@Test
@ -535,10 +553,11 @@ public class TbMathNodeTest {
);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
Throwable thrown = assertThrows(RuntimeException.class, () -> {
node.onMsg(ctx, msg);
});
assertNotNull(thrown.getMessage());
ArgumentCaptor<Throwable> tCaptor = ArgumentCaptor.forClass(Throwable.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(eq(msg), tCaptor.capture());
Assert.assertNotNull(tCaptor.getValue().getMessage());
}
@Test
@ -553,10 +572,10 @@ public class TbMathNodeTest {
CountDownLatch slowProcessingLatch = new CountDownLatch(1);
List<TbMsg> slowMsgList = IntStream.range(0, 5)
.mapToObj(x -> TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.mapToObj(x -> TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originatorSlow, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.collect(Collectors.toList());
List<TbMsg> fastMsgList = IntStream.range(0, 2)
.mapToObj(x -> TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.mapToObj(x -> TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originatorFast, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.collect(Collectors.toList());
assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE);
@ -609,6 +628,115 @@ public class TbMathNodeTest {
verify(ctx, never()).tellFailure(any(), any());
}
@Test
public void testExp4j_concurrentBySingleOriginator_processMsgAsyncException() {
TbMathNode node = spy(initNodeWithCustomFunction("2a+3b",
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
));
willThrow(new RuntimeException("Message body has no 'delta'")).given(node).resolveArguments(any(), any(), any(), any());
EntityId originatorSlow = DeviceId.fromString("7f01170d-6bba-419c-b95c-2b4c3ba32f30");
CountDownLatch slowProcessingLatch = new CountDownLatch(1);
List<TbMsg> slowMsgList = IntStream.range(0, 5)
.mapToObj(x -> TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originatorSlow, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.collect(Collectors.toList());
assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE);
log.debug("rule-dispatcher [{}], db-callback [{}], slowMsg [{}]", RULE_DISPATCHER_POOL_SIZE, DB_CALLBACK_POOL_SIZE, slowMsgList.size());
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
if (slowProcessingLatch.getCount() > 0) {
log.debug("Await on slowProcessingLatch before processMsgAsync");
try {
assertThat(slowProcessingLatch.await(30, TimeUnit.SECONDS)).as("await on slowProcessingLatch").isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("\uD83D\uDC0C processMsgAsync with exception [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).processMsgAsync(eq(ctx), argThat(slowMsgList::contains));
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("submit slow originator onMsg [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).onMsg(eq(ctx), argThat(slowMsgList::contains));
// submit slow msg may block all rule engine dispatcher threads
slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg)));
// wait until dispatcher threads started with all slowMsg
verify(node, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).onMsg(eq(ctx), argThat(slowMsgList::contains));
slowProcessingLatch.countDown();
verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).tellFailure(any(), any());
verify(ctx, never()).tellSuccess(any());
}
@Test
public void testExp4j_concurrentBySingleOriginator_SingleMsg_manyNodesWithDifferentOutput() {
assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1);
CountDownLatch processingLatch = new CountDownLatch(1);
List<Triple<TbContext, String, TbMathNode>> ctxNodes = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2)
.mapToObj(x -> {
final TbContext ctx = mock(TbContext.class); // many rule nodes - many contexts
willReturn(dbCallbackExecutor).given(ctx).getDbCallbackExecutor();
final String resultKey = "result" + x;
final TbMathNode node = spy(initNodeWithCustomFunction(ctx, "2a+3b",
new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, resultKey, 1, false, true, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")));
willAnswer(invocation -> {
if (processingLatch.getCount() > 0) {
log.debug("Await on processingLatch before processMsgAsync");
try {
assertThat(processingLatch.await(30, TimeUnit.SECONDS)).as("await on processingLatch").isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("\uD83D\uDC0C processMsgAsync on node with expected resultKey [{}]", resultKey);
return invocation.callRealMethod();
}).given(node).processMsgAsync(any(), any());
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("submit originator onMsg [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).onMsg(any(), any());
return Triple.of(ctx, resultKey, node);
})
.collect(Collectors.toList());
ctxNodes.forEach(ctxNode -> ruleEngineDispatcherExecutor.executeAsync(() -> ctxNode.getRight()
.onMsg(ctxNode.getLeft(), TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, "{\"a\":2,\"b\":2}"))));
ctxNodes.forEach(ctxNode -> verify(ctxNode.getRight(), timeout(5000)).onMsg(eq(ctxNode.getLeft()), any()));
processingLatch.countDown();
SoftAssertions softly = new SoftAssertions();
ctxNodes.forEach(ctxNode -> {
final TbContext ctx = ctxNode.getLeft();
final String resultKey = ctxNode.getMiddle();
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
assertThat(resultMsg).as("result msg non null for result key " + resultKey).isNotNull();
log.debug("asserting result key [{}] in metadata [{}]", resultKey, resultMsg.getMetaData().getData());
softly.assertThat(resultMsg.getMetaData().getValue(resultKey)).as("asserting result key " + resultKey)
.isEqualTo("10.0");
});
softly.assertAll();
verify(ctx, never()).tellFailure(any(), any());
}
static class RuleDispatcherExecutor extends AbstractListeningExecutor {
@Override
protected int getThreadPollSize() {