diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 4aabc606ab..2b87e80127 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -176,7 +176,7 @@ public class TbGetTelemetryNode implements TbNode { return getIntervalFromPatterns(msg); } else { Interval interval = new Interval(); - long ts = System.currentTimeMillis(); + long ts = getCurrentTimeMillis(); interval.setStartTs(ts - TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval())); interval.setEndTs(ts - TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval())); return interval; @@ -220,6 +220,10 @@ public class TbGetTelemetryNode implements TbNode { return limit; } + long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + @Data @NoArgsConstructor private static class Interval { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java index 2ad575f0aa..f958e13ab5 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java @@ -32,7 +32,6 @@ import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -60,7 +59,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willReturn; @ExtendWith(MockitoExtension.class) public class TbGetTelemetryNodeTest { @@ -80,7 +81,7 @@ public class TbGetTelemetryNodeTest { @BeforeEach public void setUp() { - node = new TbGetTelemetryNode(); + node = spy(new TbGetTelemetryNode()); config = new TbGetTelemetryNodeConfiguration().defaultConfiguration(); config.setLatestTsKeyNames(List.of("temperature")); } @@ -222,6 +223,8 @@ public class TbGetTelemetryNodeTest { // GIVEN node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + long ts = System.currentTimeMillis(); + willReturn(ts).given(node).getCurrentTimeMillis(); mockTimeseriesService(); given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); @@ -233,8 +236,8 @@ public class TbGetTelemetryNodeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); - assertThat(actualReadTsKvQuery.getStartTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getStartInterval())); - assertThat(actualReadTsKvQuery.getEndTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getEndInterval())); + assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getStartInterval())); + assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getEndInterval())); } @Test @@ -257,8 +260,7 @@ public class TbGetTelemetryNodeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); List actualKeys = actualReadTsKvQueryList.getValue().stream().map(TsKvQuery::getKey).toList(); - List expectedTsKeyNames = config.getLatestTsKeyNames().stream().map(tsKeyName -> TbNodeUtils.processPattern(tsKeyName, msg)).toList(); - assertThat(actualKeys).containsAll(expectedTsKeyNames); + assertThat(actualKeys).containsAll(List.of("temperature", "humidity", "pressure")); } @ParameterizedTest @@ -325,6 +327,10 @@ public class TbGetTelemetryNodeTest { Arguments.of( TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, + (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)), + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_LAST, + 10, (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)) ); } @@ -424,9 +430,11 @@ public class TbGetTelemetryNodeTest { assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); } - @Test - public void givenFetchModeIsFirst_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException { + @ParameterizedTest + @ValueSource(strings = {"FIRST", "LAST"}) + public void givenFetchMode_whenOnMsg_thenTellSuccessAndVerifyMsg(String fetchMode) throws TbNodeException { // GIVEN + config.setFetchMode(fetchMode); config.setLatestTsKeyNames(List.of("temperature", "humidity")); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));