added test givenConcurrentAccess_whenOnMsg_thenGetFromDBInvokedOnce
This commit is contained in:
parent
95fad7bf1b
commit
827c898179
@ -17,13 +17,13 @@ package org.thingsboard.rule.engine.metadata;
|
|||||||
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.assertj.core.api.Assertions;
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.thingsboard.common.util.AbstractListeningExecutor;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.common.util.ListeningExecutor;
|
import org.thingsboard.common.util.ListeningExecutor;
|
||||||
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
|
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
|
||||||
@ -45,9 +45,15 @@ import org.thingsboard.server.common.msg.TbMsg;
|
|||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
@ -57,8 +63,12 @@ import static org.mockito.ArgumentMatchers.anyList;
|
|||||||
import static org.mockito.ArgumentMatchers.anySet;
|
import static org.mockito.ArgumentMatchers.anySet;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.BDDMockito.willAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ -69,6 +79,10 @@ public class CalculateDeltaNodeTest {
|
|||||||
private final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.fromString("2ba3ded4-882b-40cf-999a-89da9ccd58f9"));
|
private final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.fromString("2ba3ded4-882b-40cf-999a-89da9ccd58f9"));
|
||||||
private final TenantId TENANT_ID = new TenantId(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba"));
|
private final TenantId TENANT_ID = new TenantId(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba"));
|
||||||
private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor();
|
private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor();
|
||||||
|
|
||||||
|
private static final int RULE_DISPATCHER_POOL_SIZE = 2;
|
||||||
|
private static final int DB_CALLBACK_POOL_SIZE = 3;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private TbContext ctxMock;
|
private TbContext ctxMock;
|
||||||
@Mock
|
@Mock
|
||||||
@ -401,7 +415,7 @@ public class CalculateDeltaNodeTest {
|
|||||||
verify(ctxMock, never()).tellNext(any(), anyString());
|
verify(ctxMock, never()).tellNext(any(), anyString());
|
||||||
verify(ctxMock, never()).tellNext(any(), anySet());
|
verify(ctxMock, never()).tellNext(any(), anySet());
|
||||||
|
|
||||||
Assertions.assertThat(throwableCaptor.getValue())
|
assertThat(throwableCaptor.getValue())
|
||||||
.isInstanceOf(IllegalArgumentException.class)
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
.hasMessage("Calculation failed. Unable to parse value [high] of telemetry [pulseCounter] to Double");
|
.hasMessage("Calculation failed. Unable to parse value [high] of telemetry [pulseCounter] to Double");
|
||||||
}
|
}
|
||||||
@ -425,7 +439,7 @@ public class CalculateDeltaNodeTest {
|
|||||||
verify(ctxMock, never()).tellNext(any(), anyString());
|
verify(ctxMock, never()).tellNext(any(), anyString());
|
||||||
verify(ctxMock, never()).tellNext(any(), anySet());
|
verify(ctxMock, never()).tellNext(any(), anySet());
|
||||||
|
|
||||||
Assertions.assertThat(throwableCaptor.getValue())
|
assertThat(throwableCaptor.getValue())
|
||||||
.isInstanceOf(IllegalArgumentException.class)
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
.hasMessage("Calculation failed. Boolean values are not supported!");
|
.hasMessage("Calculation failed. Boolean values are not supported!");
|
||||||
}
|
}
|
||||||
@ -449,11 +463,68 @@ public class CalculateDeltaNodeTest {
|
|||||||
verify(ctxMock, never()).tellNext(any(), anyString());
|
verify(ctxMock, never()).tellNext(any(), anyString());
|
||||||
verify(ctxMock, never()).tellNext(any(), anySet());
|
verify(ctxMock, never()).tellNext(any(), anySet());
|
||||||
|
|
||||||
Assertions.assertThat(throwableCaptor.getValue())
|
assertThat(throwableCaptor.getValue())
|
||||||
.isInstanceOf(IllegalArgumentException.class)
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
.hasMessage("Calculation failed. JSON values are not supported!");
|
.hasMessage("Calculation failed. JSON values are not supported!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenConcurrentAccess_whenOnMsg_thenGetFromDBInvokedOnce() throws TbNodeException, InterruptedException {
|
||||||
|
DBCallbackExecutor dbCallbackExecutor = new DBCallbackExecutor();
|
||||||
|
dbCallbackExecutor.init();
|
||||||
|
|
||||||
|
RuleDispatcherExecutor ruleEngineDispatcherExecutor = new RuleDispatcherExecutor();
|
||||||
|
ruleEngineDispatcherExecutor.init();
|
||||||
|
|
||||||
|
assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1);
|
||||||
|
|
||||||
|
final TbContext ctx = mock(TbContext.class);
|
||||||
|
final TimeseriesService timeseriesService = mock(TimeseriesService.class);
|
||||||
|
|
||||||
|
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
|
||||||
|
when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor);
|
||||||
|
when(timeseriesService.findLatest(any(), any(), anyString())).thenReturn(Futures.immediateFuture(Optional.empty()));
|
||||||
|
|
||||||
|
final CalculateDeltaNodeConfiguration config = new CalculateDeltaNodeConfiguration().defaultConfiguration();
|
||||||
|
final TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||||
|
final CalculateDeltaNode node = spy(CalculateDeltaNode.class);
|
||||||
|
|
||||||
|
node.init(ctx, nodeConfiguration);
|
||||||
|
|
||||||
|
List<TbMsg> tbMsgList = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2).mapToObj(x -> {
|
||||||
|
var msgData = "{\"pulseCounter\":" + 2 + "}";
|
||||||
|
return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, msgData);
|
||||||
|
}).toList();
|
||||||
|
|
||||||
|
CountDownLatch processingLatch = new CountDownLatch(tbMsgList.size());
|
||||||
|
|
||||||
|
willAnswer(invocation -> {
|
||||||
|
processingLatch.countDown();
|
||||||
|
return invocation.callRealMethod();
|
||||||
|
}).given(node).processMsgAsync(any(), any());
|
||||||
|
|
||||||
|
tbMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg)));
|
||||||
|
|
||||||
|
assertThat(processingLatch.await(5, TimeUnit.SECONDS)).as("await on processingLatch").isTrue();
|
||||||
|
|
||||||
|
verify(timeseriesService).findLatest(any(), any(), anyString());
|
||||||
|
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> verify(ctx, times(tbMsgList.size())).tellSuccess(any()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RuleDispatcherExecutor extends AbstractListeningExecutor {
|
||||||
|
@Override
|
||||||
|
protected int getThreadPollSize() {
|
||||||
|
return RULE_DISPATCHER_POOL_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DBCallbackExecutor extends AbstractListeningExecutor {
|
||||||
|
@Override
|
||||||
|
protected int getThreadPollSize() {
|
||||||
|
return DB_CALLBACK_POOL_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void mockFindLatestAsync(TsKvEntry tsKvEntry) {
|
private void mockFindLatestAsync(TsKvEntry tsKvEntry) {
|
||||||
when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR);
|
when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR);
|
||||||
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
|
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user