From 827c898179e39f24bd4b6c283fb0a1ac1a2f083c Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 2 Apr 2024 12:38:37 +0300 Subject: [PATCH] added test givenConcurrentAccess_whenOnMsg_thenGetFromDBInvokedOnce --- .../metadata/CalculateDeltaNodeTest.java | 79 ++++++++++++++++++- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java index 0dc228235d..36b9719d86 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java @@ -17,13 +17,13 @@ package org.thingsboard.rule.engine.metadata; import com.google.common.util.concurrent.Futures; import lombok.extern.slf4j.Slf4j; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.TestDbCallbackExecutor; @@ -45,9 +45,15 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -57,8 +63,12 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -69,6 +79,10 @@ public class CalculateDeltaNodeTest { private final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.fromString("2ba3ded4-882b-40cf-999a-89da9ccd58f9")); private final TenantId TENANT_ID = new TenantId(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba")); private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + + private static final int RULE_DISPATCHER_POOL_SIZE = 2; + private static final int DB_CALLBACK_POOL_SIZE = 3; + @Mock private TbContext ctxMock; @Mock @@ -401,7 +415,7 @@ public class CalculateDeltaNodeTest { verify(ctxMock, never()).tellNext(any(), anyString()); verify(ctxMock, never()).tellNext(any(), anySet()); - Assertions.assertThat(throwableCaptor.getValue()) + assertThat(throwableCaptor.getValue()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Calculation failed. Unable to parse value [high] of telemetry [pulseCounter] to Double"); } @@ -425,7 +439,7 @@ public class CalculateDeltaNodeTest { verify(ctxMock, never()).tellNext(any(), anyString()); verify(ctxMock, never()).tellNext(any(), anySet()); - Assertions.assertThat(throwableCaptor.getValue()) + assertThat(throwableCaptor.getValue()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Calculation failed. Boolean values are not supported!"); } @@ -449,11 +463,68 @@ public class CalculateDeltaNodeTest { verify(ctxMock, never()).tellNext(any(), anyString()); verify(ctxMock, never()).tellNext(any(), anySet()); - Assertions.assertThat(throwableCaptor.getValue()) + assertThat(throwableCaptor.getValue()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Calculation failed. JSON values are not supported!"); } + @Test + public void givenConcurrentAccess_whenOnMsg_thenGetFromDBInvokedOnce() throws TbNodeException, InterruptedException { + DBCallbackExecutor dbCallbackExecutor = new DBCallbackExecutor(); + dbCallbackExecutor.init(); + + RuleDispatcherExecutor ruleEngineDispatcherExecutor = new RuleDispatcherExecutor(); + ruleEngineDispatcherExecutor.init(); + + assertThat(RULE_DISPATCHER_POOL_SIZE).as("dispatcher pool size have to be > 1").isGreaterThan(1); + + final TbContext ctx = mock(TbContext.class); + final TimeseriesService timeseriesService = mock(TimeseriesService.class); + + when(ctx.getTimeseriesService()).thenReturn(timeseriesService); + when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); + when(timeseriesService.findLatest(any(), any(), anyString())).thenReturn(Futures.immediateFuture(Optional.empty())); + + final CalculateDeltaNodeConfiguration config = new CalculateDeltaNodeConfiguration().defaultConfiguration(); + final TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + final CalculateDeltaNode node = spy(CalculateDeltaNode.class); + + node.init(ctx, nodeConfiguration); + + List tbMsgList = IntStream.range(0, RULE_DISPATCHER_POOL_SIZE * 2).mapToObj(x -> { + var msgData = "{\"pulseCounter\":" + 2 + "}"; + return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, msgData); + }).toList(); + + CountDownLatch processingLatch = new CountDownLatch(tbMsgList.size()); + + willAnswer(invocation -> { + processingLatch.countDown(); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(any(), any()); + + tbMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + + assertThat(processingLatch.await(5, TimeUnit.SECONDS)).as("await on processingLatch").isTrue(); + + verify(timeseriesService).findLatest(any(), any(), anyString()); + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> verify(ctx, times(tbMsgList.size())).tellSuccess(any())); + } + + private static class RuleDispatcherExecutor extends AbstractListeningExecutor { + @Override + protected int getThreadPollSize() { + return RULE_DISPATCHER_POOL_SIZE; + } + } + + private static class DBCallbackExecutor extends AbstractListeningExecutor { + @Override + protected int getThreadPollSize() { + return DB_CALLBACK_POOL_SIZE; + } + } + private void mockFindLatestAsync(TsKvEntry tsKvEntry) { when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR); when(ctxMock.getTenantId()).thenReturn(TENANT_ID);