TbMathNode tests refactored
This commit is contained in:
parent
63943bc5ce
commit
a5dabae023
@ -18,7 +18,6 @@ package org.thingsboard.rule.engine.math;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@ -28,8 +27,6 @@ import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.verification.Timeout;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
import org.thingsboard.common.util.AbstractListeningExecutor;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
||||
@ -55,7 +52,6 @@ import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
@ -72,6 +68,7 @@ import static org.mockito.BDDMockito.willAnswer;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@ -81,6 +78,7 @@ public class TbMathNodeTest {
|
||||
|
||||
static final int RULE_DISPATCHER_POOL_SIZE = 2;
|
||||
static final int DB_CALLBACK_POOL_SIZE = 3;
|
||||
static final long TIMEOUT = TimeUnit.SECONDS.toMillis(5);
|
||||
private final EntityId originator = DeviceId.fromString("ccd71696-0586-422d-940e-755a41ec3b0d");
|
||||
private final TenantId tenantId = TenantId.fromUUID(UUID.fromString("e7f46b23-0c7d-42f5-9b06-fc35ab17af8a"));
|
||||
|
||||
@ -161,9 +159,6 @@ public class TbMathNodeTest {
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ConcurrentMap<EntityId, TbMathNode.SemaphoreWithQueue<TbMathNode.TbMsgTbContext>> semaphores = (ConcurrentMap<EntityId, TbMathNode.SemaphoreWithQueue<TbMathNode.TbMsgTbContext>>) ReflectionTestUtils.getField(node, "locks");
|
||||
Assert.assertNotNull(semaphores);
|
||||
|
||||
metaData.putValue("key1", "secondMsgResult");
|
||||
metaData.putValue("key2", "argumentC");
|
||||
msgNode = JacksonUtil.newObjectNode()
|
||||
@ -172,10 +167,8 @@ public class TbMathNodeTest {
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
Awaitility.await("Semaphore released").atMost(5, TimeUnit.SECONDS).until(() -> semaphores.get(originator).semaphore.tryAcquire());
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.times(2)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT).times(2)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
List<TbMsg> resultMsgs = msgCaptor.getAllValues();
|
||||
Assert.assertFalse(resultMsgs.isEmpty());
|
||||
@ -190,7 +183,6 @@ public class TbMathNodeTest {
|
||||
Assert.assertTrue(resultJson.has(resultKey));
|
||||
Assert.assertEquals(i == 0 ? 10 : 17, resultJson.get(resultKey).asInt());
|
||||
}
|
||||
semaphores.remove(originator);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -257,7 +249,7 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000).times(1)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT).times(1)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -280,7 +272,7 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -303,7 +295,7 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -326,7 +318,7 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -356,7 +348,7 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -378,7 +370,7 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -400,7 +392,7 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -425,8 +417,8 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
Mockito.verify(telemetryService, times(1)).saveAttrAndNotify(any(), any(), anyString(), anyString(), anyDouble());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
verify(telemetryService, times(1)).saveAttrAndNotify(any(), any(), anyString(), anyString(), anyDouble());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -450,8 +442,8 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
Mockito.verify(telemetryService, times(1)).saveAndNotify(any(), any(), any(TsKvEntry.class));
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
verify(telemetryService, times(1)).saveAndNotify(any(), any(), any(TsKvEntry.class));
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -475,8 +467,8 @@ public class TbMathNodeTest {
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
Mockito.verify(telemetryService, times(1)).saveAndNotify(any(), any(), any(TsKvEntry.class));
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
verify(telemetryService, times(1)).saveAndNotify(any(), any(), any(TsKvEntry.class));
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -503,7 +495,7 @@ public class TbMathNodeTest {
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -594,16 +586,16 @@ public class TbMathNodeTest {
|
||||
// submit slow msg may block all rule engine dispatcher threads
|
||||
slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg)));
|
||||
// wait until dispatcher threads started with all slowMsg
|
||||
verify(node, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).onMsg(eq(ctx), argThat(slowMsgList::contains));
|
||||
verify(node, timeout(TIMEOUT).times(slowMsgList.size())).onMsg(eq(ctx), argThat(slowMsgList::contains));
|
||||
|
||||
// submit fast have to return immediately
|
||||
fastMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg)));
|
||||
// wait until all fast messages processed
|
||||
verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size()))).tellSuccess(any());
|
||||
verify(ctx, timeout(TIMEOUT).times(fastMsgList.size())).tellSuccess(any());
|
||||
|
||||
slowProcessingLatch.countDown();
|
||||
|
||||
verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size() + slowMsgList.size()))).tellSuccess(any());
|
||||
verify(ctx, timeout(TIMEOUT).times(fastMsgList.size() + slowMsgList.size())).tellSuccess(any());
|
||||
|
||||
verify(ctx, never()).tellFailure(any(), any());
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user