From 9fe262fa4cf71c630c10ea20fd96a27e4e2b22cb Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 25 Jun 2024 12:57:00 +0300 Subject: [PATCH 1/6] added tests for originator telemetry node --- .../engine/metadata/TbGetTelemetryNode.java | 3 +- .../metadata/TbGetTelemetryNodeTest.java | 341 ++++++++++++++++-- 2 files changed, 306 insertions(+), 38 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index bcf6b1b05a..86f309ab09 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -40,7 +40,6 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -91,7 +90,7 @@ public class TbGetTelemetryNode implements TbNode { } @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + public void onMsg(TbContext ctx, TbMsg msg) { if (tsKeyNames.isEmpty()) { ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!")); } else { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java index 6ecd1b6eb9..1875dd91f1 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java @@ -15,72 +15,341 @@ */ package org.thingsboard.rule.engine.metadata; +import com.google.common.util.concurrent.Futures; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullAndEmptySource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.timeseries.TimeseriesService; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.BDDMockito.willCallRealMethod; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +@ExtendWith(MockitoExtension.class) public class TbGetTelemetryNodeTest { - TbGetTelemetryNode node; - TbGetTelemetryNodeConfiguration config; - TbNodeConfiguration nodeConfiguration; - TbContext ctx; + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5738401b-9dba-422b-b656-a62fe7431917")); + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("8a8fd749-b2ec-488b-a6c6-fc66614d8686")); + + private final ListeningExecutor executor = new TestDbCallbackExecutor(); + + private TbGetTelemetryNode node; + private TbGetTelemetryNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private TimeseriesService timeseriesServiceMock; @BeforeEach public void setUp() throws Exception { - ctx = mock(TbContext.class); - node = spy(new TbGetTelemetryNode()); - config = new TbGetTelemetryNodeConfiguration(); - config.setFetchMode("ALL"); - nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - node.init(ctx, nodeConfiguration); - - willCallRealMethod().given(node).parseAggregationConfig(any()); + node = new TbGetTelemetryNode(); + config = new TbGetTelemetryNodeConfiguration().defaultConfiguration(); } @Test - public void givenAggregationAsString_whenParseAggregation_thenReturnEnum() { + public void givenAggregationAsString_whenParseAggregation_thenReturnEnum() throws TbNodeException { + config.setFetchMode("ALL"); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + //compatibility with old configs without "aggregation" parameter - assertThat(node.parseAggregationConfig(null), is(Aggregation.NONE)); - assertThat(node.parseAggregationConfig(""), is(Aggregation.NONE)); + assertThat(node.parseAggregationConfig(null)).isEqualTo(Aggregation.NONE); + assertThat(node.parseAggregationConfig("")).isEqualTo(Aggregation.NONE); //common values - assertThat(node.parseAggregationConfig("MIN"), is(Aggregation.MIN)); - assertThat(node.parseAggregationConfig("MAX"), is(Aggregation.MAX)); - assertThat(node.parseAggregationConfig("AVG"), is(Aggregation.AVG)); - assertThat(node.parseAggregationConfig("SUM"), is(Aggregation.SUM)); - assertThat(node.parseAggregationConfig("COUNT"), is(Aggregation.COUNT)); - assertThat(node.parseAggregationConfig("NONE"), is(Aggregation.NONE)); + assertThat(node.parseAggregationConfig("MIN")).isEqualTo(Aggregation.MIN); + assertThat(node.parseAggregationConfig("MAX")).isEqualTo(Aggregation.MAX); + assertThat(node.parseAggregationConfig("AVG")).isEqualTo(Aggregation.AVG); + assertThat(node.parseAggregationConfig("SUM")).isEqualTo(Aggregation.SUM); + assertThat(node.parseAggregationConfig("COUNT")).isEqualTo(Aggregation.COUNT); + assertThat(node.parseAggregationConfig("NONE")).isEqualTo(Aggregation.NONE); //all possible values in future for (Aggregation aggEnum : Aggregation.values()) { - assertThat(node.parseAggregationConfig(aggEnum.name()), is(aggEnum)); + assertThat(node.parseAggregationConfig(aggEnum.name())).isEqualTo(aggEnum); } } @Test - public void givenAggregationWhiteSpace_whenParseAggregation_thenException() { - Assertions.assertThrows(IllegalArgumentException.class, () -> { - node.parseAggregationConfig(" "); - }); + public void givenAggregationWhiteSpace_whenParseAggregation_thenException() throws TbNodeException { + config.setFetchMode("ALL"); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + assertThatThrownBy(() -> node.parseAggregationConfig(" ")).isInstanceOf(IllegalArgumentException.class); } @Test - public void givenAggregationIncorrect_whenParseAggregation_thenException() { - Assertions.assertThrows(IllegalArgumentException.class, () -> { - node.parseAggregationConfig("TOP"); - }); + public void givenAggregationIncorrect_whenParseAggregation_thenException() throws TbNodeException { + config.setFetchMode("ALL"); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + assertThatThrownBy(() -> node.parseAggregationConfig("TOP")).isInstanceOf(IllegalArgumentException.class); } + @Test + public void verifyDefaultConfig() { + assertThat(config.getStartInterval()).isEqualTo(2); + assertThat(config.getEndInterval()).isEqualTo(1); + assertThat(config.getStartIntervalPattern()).isEqualTo(""); + assertThat(config.getEndIntervalPattern()).isEqualTo(""); + assertThat(config.isUseMetadataIntervalPatterns()).isFalse(); + assertThat(config.getStartIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name()); + assertThat(config.getEndIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name()); + assertThat(config.getFetchMode()).isEqualTo("FIRST"); + assertThat(config.getOrderBy()).isEqualTo("ASC"); + assertThat(config.getAggregation()).isEqualTo(Aggregation.NONE.name()); + assertThat(config.getLimit()).isEqualTo(1000); + assertThat(config.getLatestTsKeyNames()).isEmpty(); + } + + @ParameterizedTest + @MethodSource + public void givenFetchModeAndLimit_whenInit_thenVerifyLimit(String fetchMode, int limit, int expectedLimit) throws TbNodeException { + config.setFetchMode(fetchMode); + config.setLimit(limit); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var actualLimit = ReflectionTestUtils.getField(node, "limit"); + assertThat(actualLimit).isEqualTo(expectedLimit); + } + + private static Stream givenFetchModeAndLimit_whenInit_thenVerifyLimit() { + return Stream.of( + Arguments.of("FIRST", 0, 1), + Arguments.of("LAST", 10, 1), + Arguments.of("ALL", 0, 1000), + Arguments.of("ALL", 5, 5) + ); + } + + @ParameterizedTest + @NullAndEmptySource + public void givenEmptyOrderBy_whenInit_thenVerify(String orderBy) throws TbNodeException { + config.setOrderBy(orderBy); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var actualOrderBy = ReflectionTestUtils.getField(node, "orderByFetchAll"); + assertThat(actualOrderBy).isEqualTo("ASC"); + } + + @Test + public void givenConfig_whenInit_thenVerify() throws TbNodeException { + List keys = List.of("temperature", "humidity"); + config.setLatestTsKeyNames(keys); + config.setFetchMode("ALL"); + config.setLimit(5); + config.setOrderBy("DESC"); + config.setAggregation("MIN"); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var actualLimit = ReflectionTestUtils.getField(node, "limit"); + var actualTsKeyNames = ReflectionTestUtils.getField(node, "tsKeyNames"); + var actualFetchMode = ReflectionTestUtils.getField(node, "fetchMode"); + var actualOrderByFetchAll = ReflectionTestUtils.getField(node, "orderByFetchAll"); + var actualAggregation = ReflectionTestUtils.getField(node, "aggregation"); + + assertThat(actualLimit).isEqualTo(5); + assertThat(actualTsKeyNames).isEqualTo(keys); + assertThat(actualFetchMode).isEqualTo("ALL"); + assertThat(actualOrderByFetchAll).isEqualTo("DESC"); + assertThat(actualAggregation).isEqualTo(Aggregation.MIN); + } + + @Test + public void givenEmptyTsKeyNames_whenOnMsg_thenTellFailure() throws TbNodeException { + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + ArgumentCaptor actualException = ArgumentCaptor.forClass(Throwable.class); + then(ctxMock).should().tellFailure(eq(msg), actualException.capture()); + assertThat(actualException.getValue()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Telemetry is not selected!"); + } + + @Test + public void givenUseMetadataIntervalPatternsIsTrueAndFetchModeIsAllAndAggregationIsMin_whenOnMsg_thenTellSuccess() throws TbNodeException { + config.setLatestTsKeyNames(List.of("temperature", "humidity")); + config.setUseMetadataIntervalPatterns(true); + config.setStartIntervalPattern("${mdStartInterval}"); + config.setEndIntervalPattern("$[msgEndInterval]"); + config.setFetchMode("ALL"); + config.setAggregation("MIN"); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + List tsKvEntries = List.of( + new BasicTsKvEntry(System.currentTimeMillis() - 5, new DoubleDataEntry("temperature", 22.4)), + new BasicTsKvEntry(System.currentTimeMillis() - 4, new DoubleDataEntry("humidity", 55.5)) + ); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); + + long startTs = 1719220350000L; + long endTs = 1719220353000L; + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("mdStartInterval", String.valueOf(startTs)); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}"); + node.onMsg(ctxMock, msg); + + List expectedReadTsKvQueryList = List.of( + new BaseReadTsKvQuery("temperature", startTs, endTs, endTs - startTs, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, Aggregation.MIN, "ASC"), + new BaseReadTsKvQuery("humidity", startTs, endTs, endTs - startTs, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, Aggregation.MIN, "ASC") + ); + verifyReadTsKvQueryList(expectedReadTsKvQueryList, false); + verifyOutgoingMsg(msg); + } + + @Test + public void givenUseMetadataIntervalPatternsIsTrueAndFetchModeIsLastAndAggregationIsMax_whenOnMsg_thenTellSuccess() throws TbNodeException { + config.setLatestTsKeyNames(List.of("temperature", "humidity")); + config.setUseMetadataIntervalPatterns(true); + config.setStartIntervalPattern("${mdStartInterval}"); + config.setEndIntervalPattern("$[msgEndInterval]"); + config.setFetchMode("LAST"); + config.setAggregation("MAX"); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + long ts = System.currentTimeMillis(); + List tsKvEntries = List.of( + new BasicTsKvEntry(ts - 5, new DoubleDataEntry("temperature", 22.4)), + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5)) + ); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); + + long startTs = 1719220350000L; + long endTs = 1719220353000L; + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("mdStartInterval", String.valueOf(startTs)); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}"); + node.onMsg(ctxMock, msg); + + List expectedReadTsKvQueryList = List.of( + new BaseReadTsKvQuery("temperature", startTs, endTs, 1, 1, Aggregation.NONE, "DESC"), + new BaseReadTsKvQuery("humidity", startTs, endTs, 1, 1, Aggregation.NONE, "DESC") + ); + verifyReadTsKvQueryList(expectedReadTsKvQueryList, false); + verifyOutgoingMsg(msg); + } + + @Test + public void givenUseMetadataIntervalPatternsIsFalseAndTsKeyNamesPatternsAndFetchModeIsFirst_whenOnMsg_thenTellFailure() throws TbNodeException { + config.setLatestTsKeyNames(List.of("temperature", "${mdTsKey}", "$[msgTsKey]"));; + config.setFetchMode("FIRST"); + config.setStartInterval(6); + config.setEndInterval(1); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + String errorMsg = "Something went wrong"; + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFailedFuture(new RuntimeException(errorMsg))); + + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("mdTsKey", "humidity"); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgTsKey\":\"pressure\"}"); + node.onMsg(ctxMock, msg); + + long ts = System.currentTimeMillis(); + long startTs = ts - TimeUnit.SECONDS.toMillis(config.getStartInterval()); + long endTs = ts - TimeUnit.SECONDS.toMillis(config.getEndInterval()); + List expecetdReadTsKvQueryList = List.of( + new BaseReadTsKvQuery("temperature", startTs, endTs, 1, 1, Aggregation.NONE, "ASC"), + new BaseReadTsKvQuery("humidity", startTs, endTs, 1, 1, Aggregation.NONE, "ASC"), + new BaseReadTsKvQuery("pressure", startTs, endTs, 1, 1, Aggregation.NONE, "ASC") + ); + verifyReadTsKvQueryList(expecetdReadTsKvQueryList, true); + + ArgumentCaptor actualException = ArgumentCaptor.forClass(Throwable.class); + then(ctxMock).should().tellFailure(eq(msg), actualException.capture()); + assertThat(actualException.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + } + + @ParameterizedTest + @MethodSource + public void givenInvalidIntervalPatterns_whenOnMsg_thenTellFailure(String startIntervalPattern, String errorMsg) throws TbNodeException { + config.setLatestTsKeyNames(List.of("${mdKey}")); + config.setUseMetadataIntervalPatterns(true); + config.setStartIntervalPattern(startIntervalPattern); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msgStartInterval\":\"start\"}"); + node.onMsg(ctxMock, msg); + + ArgumentCaptor actualException = ArgumentCaptor.forClass(Throwable.class); + then(ctxMock).should().tellFailure(eq(msg), actualException.capture()); + assertThat(actualException.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage(errorMsg); + } + + private static Stream givenInvalidIntervalPatterns_whenOnMsg_thenTellFailure() { + return Stream.of( + Arguments.of("${mdStartInterval}", "Message value: 'mdStartInterval' is undefined"), + Arguments.of("$[msgStartInterval]", "Message value: 'msgStartInterval' has invalid format") + ); + } + + private void mockTimeseriesService() { + given(ctxMock.getTimeseriesService()).willReturn(timeseriesServiceMock); + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + given(ctxMock.getDbCallbackExecutor()).willReturn(executor); + } + + private void verifyReadTsKvQueryList(List expectedReadTsKvQueryList, boolean ignoreTs) { + ArgumentCaptor> actualReadTsKvQueryCaptor = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryCaptor.capture()); + List actualReadTsKvQuery = actualReadTsKvQueryCaptor.getValue(); + for (int i = 0; i < expectedReadTsKvQueryList.size(); i++) { + assertThat(actualReadTsKvQuery.get(i)) + .usingRecursiveComparison() + .ignoringFields(!ignoreTs ? "id" : "endTs", "startTs", "id") + .isEqualTo(expectedReadTsKvQueryList.get(i)); + } + } + + private void verifyOutgoingMsg(TbMsg expectedMsg) { + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellSuccess(actualMsgCaptor.capture()); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx", "metaData").isEqualTo(expectedMsg); + assertThat(actualMsg.getMetaData().getData()).containsKeys("temperature", "humidity"); + } } From b4ba613c54c2e8e8b6160d3fe7087b29288ab9a4 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 4 Jul 2024 15:05:46 +0300 Subject: [PATCH 2/6] splitted to more specific tests --- .../engine/metadata/TbGetTelemetryNode.java | 58 +-- .../metadata/TbGetTelemetryNodeTest.java | 441 +++++++++++------- 2 files changed, 304 insertions(+), 195 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 86f309ab09..4aabc606ab 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -66,19 +66,19 @@ public class TbGetTelemetryNode implements TbNode { private List tsKeyNames; private int limit; private String fetchMode; - private String orderByFetchAll; + private String orderBy; private Aggregation aggregation; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbGetTelemetryNodeConfiguration.class); tsKeyNames = config.getLatestTsKeyNames(); + if (tsKeyNames.isEmpty()) { + throw new TbNodeException("Telemetry is not selected!", true); + } limit = config.getFetchMode().equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL) ? validateLimit(config.getLimit()) : 1; fetchMode = config.getFetchMode(); - orderByFetchAll = config.getOrderBy(); - if (StringUtils.isEmpty(orderByFetchAll)) { - orderByFetchAll = ASC_ORDER; - } + orderBy = getOrderBy(); aggregation = parseAggregationConfig(config.getAggregation()); } @@ -91,21 +91,13 @@ public class TbGetTelemetryNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - if (tsKeyNames.isEmpty()) { - ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!")); - } else { - try { - Interval interval = getInterval(msg); - List keys = TbNodeUtils.processPatterns(tsKeyNames, msg); - ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys)); - DonAsynchron.withCallback(list, data -> { - var metaData = updateMetadata(data, msg, keys); - ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData)); - }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); - } catch (Exception e) { - ctx.tellFailure(msg, e); - } - } + Interval interval = getInterval(msg); + List keys = TbNodeUtils.processPatterns(tsKeyNames, msg); + ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys)); + DonAsynchron.withCallback(list, data -> { + var metaData = updateMetadata(data, msg, keys); + ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData)); + }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); } private List buildQueries(Interval interval, List keys) { @@ -115,14 +107,14 @@ public class TbGetTelemetryNode implements TbNode { interval.getEndTs() - interval.getStartTs(); return keys.stream() - .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, getOrderBy())) + .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, orderBy)) .collect(Collectors.toList()); } - private String getOrderBy() { + private String getOrderBy() throws TbNodeException { switch (fetchMode) { case TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL: - return orderByFetchAll; + return getOrderByFetchAll(); case TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST: return ASC_ORDER; default: @@ -130,6 +122,17 @@ public class TbGetTelemetryNode implements TbNode { } } + private String getOrderByFetchAll() throws TbNodeException { + String orderBy = config.getOrderBy(); + if (ASC_ORDER.equals(orderBy) || DESC_ORDER.equals(orderBy)) { + return orderBy; + } + if (StringUtils.isBlank(orderBy)) { + return ASC_ORDER; + } + throw new TbNodeException("Invalid fetch order selected.", true); + } + private TbMsgMetaData updateMetadata(List entries, TbMsg msg, List keys) { ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); if (TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL.equals(fetchMode)) { @@ -210,12 +213,11 @@ public class TbGetTelemetryNode implements TbNode { return pattern.replaceAll("[$\\[{}\\]]", ""); } - private int validateLimit(int limit) { - if (limit != 0) { - return limit; - } else { - return TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE; + private int validateLimit(int limit) throws TbNodeException { + if (limit < 2 || limit > TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE) { + throw new TbNodeException("Limit should be in a range from 2 to 1000.", true); } + return limit; } @Data diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java index 1875dd91f1..2ad575f0aa 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java @@ -22,34 +22,36 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.NullAndEmptySource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; -import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -77,14 +79,15 @@ public class TbGetTelemetryNodeTest { private TimeseriesService timeseriesServiceMock; @BeforeEach - public void setUp() throws Exception { + public void setUp() { node = new TbGetTelemetryNode(); config = new TbGetTelemetryNodeConfiguration().defaultConfiguration(); + config.setLatestTsKeyNames(List.of("temperature")); } @Test public void givenAggregationAsString_whenParseAggregation_thenReturnEnum() throws TbNodeException { - config.setFetchMode("ALL"); + config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); //compatibility with old configs without "aggregation" parameter @@ -107,20 +110,27 @@ public class TbGetTelemetryNodeTest { @Test public void givenAggregationWhiteSpace_whenParseAggregation_thenException() throws TbNodeException { - config.setFetchMode("ALL"); + // GIVEN + config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + // WHEN-THEN assertThatThrownBy(() -> node.parseAggregationConfig(" ")).isInstanceOf(IllegalArgumentException.class); } @Test public void givenAggregationIncorrect_whenParseAggregation_thenException() throws TbNodeException { - config.setFetchMode("ALL"); + // GIVEN + config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + // WHEN-THEN assertThatThrownBy(() -> node.parseAggregationConfig("TOP")).isInstanceOf(IllegalArgumentException.class); } @Test public void verifyDefaultConfig() { + config = new TbGetTelemetryNodeConfiguration().defaultConfiguration(); assertThat(config.getStartInterval()).isEqualTo(2); assertThat(config.getEndInterval()).isEqualTo(1); assertThat(config.getStartIntervalPattern()).isEqualTo(""); @@ -128,228 +138,325 @@ public class TbGetTelemetryNodeTest { assertThat(config.isUseMetadataIntervalPatterns()).isFalse(); assertThat(config.getStartIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name()); assertThat(config.getEndIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name()); - assertThat(config.getFetchMode()).isEqualTo("FIRST"); + assertThat(config.getFetchMode()).isEqualTo(TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST); assertThat(config.getOrderBy()).isEqualTo("ASC"); assertThat(config.getAggregation()).isEqualTo(Aggregation.NONE.name()); assertThat(config.getLimit()).isEqualTo(1000); assertThat(config.getLatestTsKeyNames()).isEmpty(); } + @Test + public void givenEmptyTsKeyNames_whenInit_thenThrowsException() { + // GIVEN + config.setLatestTsKeyNames(Collections.emptyList()); + + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("Telemetry is not selected!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + @ParameterizedTest - @MethodSource - public void givenFetchModeAndLimit_whenInit_thenVerifyLimit(String fetchMode, int limit, int expectedLimit) throws TbNodeException { - config.setFetchMode(fetchMode); + @ValueSource(ints = {-1, 0, 1, 1001, 2000}) + public void givenFetchModeAllAndLimitIsOutOfRange_whenInit_thenThrowsException(int limit) { + // GIVEN + config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); config.setLimit(limit); - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - var actualLimit = ReflectionTestUtils.getField(node, "limit"); - assertThat(actualLimit).isEqualTo(expectedLimit); - } - - private static Stream givenFetchModeAndLimit_whenInit_thenVerifyLimit() { - return Stream.of( - Arguments.of("FIRST", 0, 1), - Arguments.of("LAST", 10, 1), - Arguments.of("ALL", 0, 1000), - Arguments.of("ALL", 5, 5) - ); + // THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("Limit should be in a range from 2 to 1000.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); } @ParameterizedTest - @NullAndEmptySource - public void givenEmptyOrderBy_whenInit_thenVerify(String orderBy) throws TbNodeException { + @ValueSource(strings = {".ASC", "ascending", "DESCENDING"}) + public void givenFetchModeAllAndInvalidOrderBy_whenInit_thenThrowsException(String orderBy) { + // GIVEN + config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); + config.setLimit(2); config.setOrderBy(orderBy); - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - var actualOrderBy = ReflectionTestUtils.getField(node, "orderByFetchAll"); - assertThat(actualOrderBy).isEqualTo("ASC"); + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("Invalid fetch order selected.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); } @Test - public void givenConfig_whenInit_thenVerify() throws TbNodeException { - List keys = List.of("temperature", "humidity"); - config.setLatestTsKeyNames(keys); - config.setFetchMode("ALL"); - config.setLimit(5); - config.setOrderBy("DESC"); - config.setAggregation("MIN"); + public void givenUseMetadataIntervalPatternsIsTrue_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException { + // GIVEN + config.setUseMetadataIntervalPatterns(true); + config.setStartIntervalPattern("${mdStartInterval}"); + config.setEndIntervalPattern("$[msgEndInterval]"); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - var actualLimit = ReflectionTestUtils.getField(node, "limit"); - var actualTsKeyNames = ReflectionTestUtils.getField(node, "tsKeyNames"); - var actualFetchMode = ReflectionTestUtils.getField(node, "fetchMode"); - var actualOrderByFetchAll = ReflectionTestUtils.getField(node, "orderByFetchAll"); - var actualAggregation = ReflectionTestUtils.getField(node, "aggregation"); + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); - assertThat(actualLimit).isEqualTo(5); - assertThat(actualTsKeyNames).isEqualTo(keys); - assertThat(actualFetchMode).isEqualTo("ALL"); - assertThat(actualOrderByFetchAll).isEqualTo("DESC"); - assertThat(actualAggregation).isEqualTo(Aggregation.MIN); + // WHEN + long startTs = 1719220350000L; + long endTs = 1719220353000L; + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("mdStartInterval", String.valueOf(startTs)); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}"); + node.onMsg(ctxMock, msg); + + /// THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(startTs); + assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(endTs); } @Test - public void givenEmptyTsKeyNames_whenOnMsg_thenTellFailure() throws TbNodeException { + public void givenUseMetadataIntervalPatternsIsFalse_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException { + // GIVEN node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); node.onMsg(ctxMock, msg); - ArgumentCaptor actualException = ArgumentCaptor.forClass(Throwable.class); - then(ctxMock).should().tellFailure(eq(msg), actualException.capture()); - assertThat(actualException.getValue()) - .isInstanceOf(IllegalStateException.class) - .hasMessage("Telemetry is not selected!"); + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + assertThat(actualReadTsKvQuery.getStartTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getStartInterval())); + assertThat(actualReadTsKvQuery.getEndTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getEndInterval())); } @Test - public void givenUseMetadataIntervalPatternsIsTrueAndFetchModeIsAllAndAggregationIsMin_whenOnMsg_thenTellSuccess() throws TbNodeException { - config.setLatestTsKeyNames(List.of("temperature", "humidity")); - config.setUseMetadataIntervalPatterns(true); - config.setStartIntervalPattern("${mdStartInterval}"); - config.setEndIntervalPattern("$[msgEndInterval]"); - config.setFetchMode("ALL"); - config.setAggregation("MIN"); + public void givenTsKeyNamesPatterns_whenOnMsg_thenVerifyTsKeyNamesInQuery() throws TbNodeException { + // GIVEN + config.setLatestTsKeyNames(List.of("temperature", "${mdTsKey}", "$[msgTsKey]")); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); mockTimeseriesService(); - List tsKvEntries = List.of( - new BasicTsKvEntry(System.currentTimeMillis() - 5, new DoubleDataEntry("temperature", 22.4)), - new BasicTsKvEntry(System.currentTimeMillis() - 4, new DoubleDataEntry("humidity", 55.5)) - ); - given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); - - long startTs = 1719220350000L; - long endTs = 1719220353000L; - TbMsgMetaData metaData = new TbMsgMetaData(); - metaData.putValue("mdStartInterval", String.valueOf(startTs)); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}"); - node.onMsg(ctxMock, msg); - - List expectedReadTsKvQueryList = List.of( - new BaseReadTsKvQuery("temperature", startTs, endTs, endTs - startTs, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, Aggregation.MIN, "ASC"), - new BaseReadTsKvQuery("humidity", startTs, endTs, endTs - startTs, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, Aggregation.MIN, "ASC") - ); - verifyReadTsKvQueryList(expectedReadTsKvQueryList, false); - verifyOutgoingMsg(msg); - } - - @Test - public void givenUseMetadataIntervalPatternsIsTrueAndFetchModeIsLastAndAggregationIsMax_whenOnMsg_thenTellSuccess() throws TbNodeException { - config.setLatestTsKeyNames(List.of("temperature", "humidity")); - config.setUseMetadataIntervalPatterns(true); - config.setStartIntervalPattern("${mdStartInterval}"); - config.setEndIntervalPattern("$[msgEndInterval]"); - config.setFetchMode("LAST"); - config.setAggregation("MAX"); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - mockTimeseriesService(); - long ts = System.currentTimeMillis(); - List tsKvEntries = List.of( - new BasicTsKvEntry(ts - 5, new DoubleDataEntry("temperature", 22.4)), - new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5)) - ); - given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); - - long startTs = 1719220350000L; - long endTs = 1719220353000L; - TbMsgMetaData metaData = new TbMsgMetaData(); - metaData.putValue("mdStartInterval", String.valueOf(startTs)); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}"); - node.onMsg(ctxMock, msg); - - List expectedReadTsKvQueryList = List.of( - new BaseReadTsKvQuery("temperature", startTs, endTs, 1, 1, Aggregation.NONE, "DESC"), - new BaseReadTsKvQuery("humidity", startTs, endTs, 1, 1, Aggregation.NONE, "DESC") - ); - verifyReadTsKvQueryList(expectedReadTsKvQueryList, false); - verifyOutgoingMsg(msg); - } - - @Test - public void givenUseMetadataIntervalPatternsIsFalseAndTsKeyNamesPatternsAndFetchModeIsFirst_whenOnMsg_thenTellFailure() throws TbNodeException { - config.setLatestTsKeyNames(List.of("temperature", "${mdTsKey}", "$[msgTsKey]"));; - config.setFetchMode("FIRST"); - config.setStartInterval(6); - config.setEndInterval(1); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - mockTimeseriesService(); - String errorMsg = "Something went wrong"; - given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFailedFuture(new RuntimeException(errorMsg))); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + // WHEN TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("mdTsKey", "humidity"); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgTsKey\":\"pressure\"}"); node.onMsg(ctxMock, msg); - long ts = System.currentTimeMillis(); - long startTs = ts - TimeUnit.SECONDS.toMillis(config.getStartInterval()); - long endTs = ts - TimeUnit.SECONDS.toMillis(config.getEndInterval()); - List expecetdReadTsKvQueryList = List.of( - new BaseReadTsKvQuery("temperature", startTs, endTs, 1, 1, Aggregation.NONE, "ASC"), - new BaseReadTsKvQuery("humidity", startTs, endTs, 1, 1, Aggregation.NONE, "ASC"), - new BaseReadTsKvQuery("pressure", startTs, endTs, 1, 1, Aggregation.NONE, "ASC") - ); - verifyReadTsKvQueryList(expecetdReadTsKvQueryList, true); - - ArgumentCaptor actualException = ArgumentCaptor.forClass(Throwable.class); - then(ctxMock).should().tellFailure(eq(msg), actualException.capture()); - assertThat(actualException.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + List actualKeys = actualReadTsKvQueryList.getValue().stream().map(TsKvQuery::getKey).toList(); + List expectedTsKeyNames = config.getLatestTsKeyNames().stream().map(tsKeyName -> TbNodeUtils.processPattern(tsKeyName, msg)).toList(); + assertThat(actualKeys).containsAll(expectedTsKeyNames); } @ParameterizedTest @MethodSource - public void givenInvalidIntervalPatterns_whenOnMsg_thenTellFailure(String startIntervalPattern, String errorMsg) throws TbNodeException { - config.setLatestTsKeyNames(List.of("${mdKey}")); + public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(String aggregation, Consumer verifyAggregationStepInQuery) throws TbNodeException { + // GIVEN + config.setStartInterval(5); + config.setEndInterval(1); + config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); + config.setAggregation(aggregation); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + verifyAggregationStepInQuery.accept(actualReadTsKvQuery); + } + + private static Stream givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery() { + return Stream.of( + Arguments.of("", (Consumer) query -> assertThat(query.getInterval()).isEqualTo(1)), + Arguments.of("MIN", (Consumer) query -> assertThat(query.getInterval()).isEqualTo(query.getEndTs() - query.getStartTs())) + ); + } + + @ParameterizedTest + @MethodSource + public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(String fetchMode, int limit, Consumer verifyLimitInQuery) throws TbNodeException { + // GIVEN + config.setFetchMode(fetchMode); + config.setLimit(limit); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + verifyLimitInQuery.accept(actualReadTsKvQuery); + } + + private static Stream givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery() { + return Stream.of( + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL, + 5, + (Consumer) query -> assertThat(query.getLimit()).isEqualTo(5)), + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST, + TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, + (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)) + ); + } + + @ParameterizedTest + @MethodSource + public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(String fetchMode, String orderBy, Consumer verifyOrderInQuery) throws TbNodeException { + // GIVEN + config.setFetchMode(fetchMode); + config.setOrderBy(orderBy); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + verifyOrderInQuery.accept(actualReadTsKvQuery); + } + + private static Stream givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery() { + return Stream.of( + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL, + "", + (Consumer) query -> assertThat(query.getOrder()).isEqualTo("ASC")), + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL, + "DESC", + (Consumer) query -> assertThat(query.getOrder()).isEqualTo("DESC")), + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST, + "ASC", + (Consumer) query -> assertThat(query.getOrder()).isEqualTo("ASC")), + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_LAST, + "ASC", + (Consumer) query -> assertThat(query.getOrder()).isEqualTo("DESC")) + ); + } + + @ParameterizedTest + @MethodSource + public void givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException(String startIntervalPattern, String errorMsg) throws TbNodeException { + // GIVEN config.setUseMetadataIntervalPatterns(true); config.setStartIntervalPattern(startIntervalPattern); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + // WHEN-THEN TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msgStartInterval\":\"start\"}"); - node.onMsg(ctxMock, msg); - - ArgumentCaptor actualException = ArgumentCaptor.forClass(Throwable.class); - then(ctxMock).should().tellFailure(eq(msg), actualException.capture()); - assertThat(actualException.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage(errorMsg); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)).isInstanceOf(IllegalArgumentException.class).hasMessage(errorMsg); } - private static Stream givenInvalidIntervalPatterns_whenOnMsg_thenTellFailure() { + private static Stream givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException() { return Stream.of( Arguments.of("${mdStartInterval}", "Message value: 'mdStartInterval' is undefined"), Arguments.of("$[msgStartInterval]", "Message value: 'msgStartInterval' has invalid format") ); } + @Test + public void givenFetchModeAll_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException { + // GIVEN + config.setLatestTsKeyNames(List.of("temperature", "humidity")); + config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + long ts = System.currentTimeMillis(); + List tsKvEntries = List.of( + new BasicTsKvEntry(ts - 5, new DoubleDataEntry("temperature", 23.1)), + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)), + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5)) + ); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellSuccess(actualMsg.capture()); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temperature", "[{\"ts\":" + (ts - 5) + ",\"value\":23.1},{\"ts\":" + (ts - 4) + ",\"value\":22.4}]"); + metaData.putValue("humidity", "[{\"ts\":" + (ts - 4) + ",\"value\":55.5}]"); + TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); + } + + @Test + public void givenFetchModeIsFirst_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException { + // GIVEN + config.setLatestTsKeyNames(List.of("temperature", "humidity")); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + long ts = System.currentTimeMillis(); + List tsKvEntries = List.of( + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)), + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5)) + ); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellSuccess(actualMsg.capture()); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temperature", "\"22.4\""); + metaData.putValue("humidity", "\"55.5\""); + TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); + } + private void mockTimeseriesService() { given(ctxMock.getTimeseriesService()).willReturn(timeseriesServiceMock); given(ctxMock.getTenantId()).willReturn(TENANT_ID); given(ctxMock.getDbCallbackExecutor()).willReturn(executor); } - private void verifyReadTsKvQueryList(List expectedReadTsKvQueryList, boolean ignoreTs) { - ArgumentCaptor> actualReadTsKvQueryCaptor = ArgumentCaptor.forClass(List.class); - then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryCaptor.capture()); - List actualReadTsKvQuery = actualReadTsKvQueryCaptor.getValue(); - for (int i = 0; i < expectedReadTsKvQueryList.size(); i++) { - assertThat(actualReadTsKvQuery.get(i)) - .usingRecursiveComparison() - .ignoringFields(!ignoreTs ? "id" : "endTs", "startTs", "id") - .isEqualTo(expectedReadTsKvQueryList.get(i)); - } - } - - private void verifyOutgoingMsg(TbMsg expectedMsg) { - ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); - then(ctxMock).should().tellSuccess(actualMsgCaptor.capture()); - TbMsg actualMsg = actualMsgCaptor.getValue(); - assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx", "metaData").isEqualTo(expectedMsg); - assertThat(actualMsg.getMetaData().getData()).containsKeys("temperature", "humidity"); - } } From a91caa3d097533938bae74ee62d4feb626c4c784 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 9 Aug 2024 09:10:25 +0300 Subject: [PATCH 3/6] added additional tests and moved currentTimeMillis to separate method to have control over time in tests --- .../engine/metadata/TbGetTelemetryNode.java | 6 ++++- .../metadata/TbGetTelemetryNodeTest.java | 24 ++++++++++++------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 4aabc606ab..2b87e80127 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -176,7 +176,7 @@ public class TbGetTelemetryNode implements TbNode { return getIntervalFromPatterns(msg); } else { Interval interval = new Interval(); - long ts = System.currentTimeMillis(); + long ts = getCurrentTimeMillis(); interval.setStartTs(ts - TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval())); interval.setEndTs(ts - TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval())); return interval; @@ -220,6 +220,10 @@ public class TbGetTelemetryNode implements TbNode { return limit; } + long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + @Data @NoArgsConstructor private static class Interval { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java index 2ad575f0aa..f958e13ab5 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java @@ -32,7 +32,6 @@ import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -60,7 +59,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willReturn; @ExtendWith(MockitoExtension.class) public class TbGetTelemetryNodeTest { @@ -80,7 +81,7 @@ public class TbGetTelemetryNodeTest { @BeforeEach public void setUp() { - node = new TbGetTelemetryNode(); + node = spy(new TbGetTelemetryNode()); config = new TbGetTelemetryNodeConfiguration().defaultConfiguration(); config.setLatestTsKeyNames(List.of("temperature")); } @@ -222,6 +223,8 @@ public class TbGetTelemetryNodeTest { // GIVEN node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + long ts = System.currentTimeMillis(); + willReturn(ts).given(node).getCurrentTimeMillis(); mockTimeseriesService(); given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); @@ -233,8 +236,8 @@ public class TbGetTelemetryNodeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); - assertThat(actualReadTsKvQuery.getStartTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getStartInterval())); - assertThat(actualReadTsKvQuery.getEndTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getEndInterval())); + assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getStartInterval())); + assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getEndInterval())); } @Test @@ -257,8 +260,7 @@ public class TbGetTelemetryNodeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); List actualKeys = actualReadTsKvQueryList.getValue().stream().map(TsKvQuery::getKey).toList(); - List expectedTsKeyNames = config.getLatestTsKeyNames().stream().map(tsKeyName -> TbNodeUtils.processPattern(tsKeyName, msg)).toList(); - assertThat(actualKeys).containsAll(expectedTsKeyNames); + assertThat(actualKeys).containsAll(List.of("temperature", "humidity", "pressure")); } @ParameterizedTest @@ -325,6 +327,10 @@ public class TbGetTelemetryNodeTest { Arguments.of( TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, + (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)), + Arguments.of( + TbGetTelemetryNodeConfiguration.FETCH_MODE_LAST, + 10, (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)) ); } @@ -424,9 +430,11 @@ public class TbGetTelemetryNodeTest { assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); } - @Test - public void givenFetchModeIsFirst_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException { + @ParameterizedTest + @ValueSource(strings = {"FIRST", "LAST"}) + public void givenFetchMode_whenOnMsg_thenTellSuccessAndVerifyMsg(String fetchMode) throws TbNodeException { // GIVEN + config.setFetchMode(fetchMode); config.setLatestTsKeyNames(List.of("temperature", "humidity")); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); From 4721ed275de2d4569674b2ec6994bcb40f5b516b Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 13 Aug 2024 14:08:25 +0300 Subject: [PATCH 4/6] added checks for null for fetchMode and OrderBy --- .../rule/engine/metadata/FetchMode.java | 22 +++ .../engine/metadata/TbGetTelemetryNode.java | 64 ++++--- .../TbGetTelemetryNodeConfiguration.java | 13 +- .../metadata/TbGetTelemetryNodeTest.java | 175 ++++++++++++++---- 4 files changed, 199 insertions(+), 75 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/FetchMode.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/FetchMode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/FetchMode.java new file mode 100644 index 0000000000..57fee39a3f --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/FetchMode.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +public enum FetchMode { + + FIRST, ALL, LAST + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 2b87e80127..5faf0e40e2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.metadata; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.ListenableFuture; @@ -35,7 +36,9 @@ import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.page.SortOrder.Direction; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -50,6 +53,7 @@ import java.util.stream.Collectors; @RuleNode(type = ComponentType.ENRICHMENT, name = "originator telemetry", configClazz = TbGetTelemetryNodeConfiguration.class, + version = 1, nodeDescription = "Adds message originator telemetry for selected time range into message metadata", nodeDetails = "Useful when you need to get telemetry data set from the message originator for a specific time range " + "instead of fetching just the latest telemetry or if you need to get the closest telemetry to the fetch interval start or end. " + @@ -59,14 +63,11 @@ import java.util.stream.Collectors; configDirective = "tbEnrichmentNodeGetTelemetryFromDatabase") public class TbGetTelemetryNode implements TbNode { - private static final String DESC_ORDER = "DESC"; - private static final String ASC_ORDER = "ASC"; - private TbGetTelemetryNodeConfiguration config; private List tsKeyNames; private int limit; - private String fetchMode; - private String orderBy; + private FetchMode fetchMode; + private Direction orderBy; private Aggregation aggregation; @Override @@ -76,14 +77,17 @@ public class TbGetTelemetryNode implements TbNode { if (tsKeyNames.isEmpty()) { throw new TbNodeException("Telemetry is not selected!", true); } - limit = config.getFetchMode().equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL) ? validateLimit(config.getLimit()) : 1; + if (config.getFetchMode() == null) { + throw new TbNodeException("FetchMode should be specified!", true); + } + limit = FetchMode.ALL.equals(config.getFetchMode()) ? validateLimit(config.getLimit()) : 1; fetchMode = config.getFetchMode(); orderBy = getOrderBy(); aggregation = parseAggregationConfig(config.getAggregation()); } Aggregation parseAggregationConfig(String aggName) { - if (StringUtils.isEmpty(aggName) || !fetchMode.equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL)) { + if (StringUtils.isEmpty(aggName) || !fetchMode.equals(FetchMode.ALL)) { return Aggregation.NONE; } return Aggregation.valueOf(aggName); @@ -107,35 +111,29 @@ public class TbGetTelemetryNode implements TbNode { interval.getEndTs() - interval.getStartTs(); return keys.stream() - .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, orderBy)) + .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, orderBy.name())) .collect(Collectors.toList()); } - private String getOrderBy() throws TbNodeException { + private Direction getOrderBy() throws TbNodeException { switch (fetchMode) { - case TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL: - return getOrderByFetchAll(); - case TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST: - return ASC_ORDER; + case ALL: + if (config.getOrderBy() == null) { + throw new TbNodeException("OrderBy should be specified!", true); + } + return config.getOrderBy(); + case FIRST: + return Direction.ASC; + case LAST: + return Direction.DESC; default: - return DESC_ORDER; + throw new TbNodeException("FetchMode '" + fetchMode + "' is not supported.", true); } } - private String getOrderByFetchAll() throws TbNodeException { - String orderBy = config.getOrderBy(); - if (ASC_ORDER.equals(orderBy) || DESC_ORDER.equals(orderBy)) { - return orderBy; - } - if (StringUtils.isBlank(orderBy)) { - return ASC_ORDER; - } - throw new TbNodeException("Invalid fetch order selected.", true); - } - private TbMsgMetaData updateMetadata(List entries, TbMsg msg, List keys) { ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); - if (TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL.equals(fetchMode)) { + if (FetchMode.ALL.equals(fetchMode)) { entries.forEach(entry -> processArray(resultNode, entry)); } else { entries.forEach(entry -> processSingle(resultNode, entry)); @@ -231,4 +229,18 @@ public class TbGetTelemetryNode implements TbNode { private Long endTs; } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0 -> { + if (oldConfiguration.has("orderBy") && oldConfiguration.get("orderBy").isNull()) { + ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name()); + hasChanges = true; + } + } + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java index c4128a4481..3bb9ff1013 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.metadata; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.page.SortOrder.Direction; import java.util.Collections; import java.util.List; @@ -29,10 +30,6 @@ import java.util.concurrent.TimeUnit; @Data public class TbGetTelemetryNodeConfiguration implements NodeConfiguration { - public static final String FETCH_MODE_FIRST = "FIRST"; - public static final String FETCH_MODE_LAST = "LAST"; - public static final String FETCH_MODE_ALL = "ALL"; - public static final int MAX_FETCH_SIZE = 1000; private int startInterval; @@ -45,8 +42,8 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("FetchMode should be specified!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + + @Test + public void givenFetchModeAllAndOrderByIsNull_whenInit_thenThrowsException() { + // GIVEN + config.setFetchMode(FetchMode.ALL); + config.setOrderBy(null); + + // THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("OrderBy should be specified!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + @ParameterizedTest @ValueSource(ints = {-1, 0, 1, 1001, 2000}) public void givenFetchModeAllAndLimitIsOutOfRange_whenInit_thenThrowsException(int limit) { // GIVEN - config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); + config.setFetchMode(FetchMode.ALL); config.setLimit(limit); // THEN @@ -174,22 +204,6 @@ public class TbGetTelemetryNodeTest { .isEqualTo(true); } - @ParameterizedTest - @ValueSource(strings = {".ASC", "ascending", "DESCENDING"}) - public void givenFetchModeAllAndInvalidOrderBy_whenInit_thenThrowsException(String orderBy) { - // GIVEN - config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); - config.setLimit(2); - config.setOrderBy(orderBy); - - // WHEN-THEN - assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) - .isInstanceOf(TbNodeException.class) - .hasMessage("Invalid fetch order selected.") - .extracting(e -> ((TbNodeException) e).isUnrecoverable()) - .isEqualTo(true); - } - @Test public void givenUseMetadataIntervalPatternsIsTrue_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException { // GIVEN @@ -269,7 +283,7 @@ public class TbGetTelemetryNodeTest { // GIVEN config.setStartInterval(5); config.setEndInterval(1); - config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); + config.setFetchMode(FetchMode.ALL); config.setAggregation(aggregation); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); @@ -297,7 +311,7 @@ public class TbGetTelemetryNodeTest { @ParameterizedTest @MethodSource - public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(String fetchMode, int limit, Consumer verifyLimitInQuery) throws TbNodeException { + public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(FetchMode fetchMode, int limit, Consumer verifyLimitInQuery) throws TbNodeException { // GIVEN config.setFetchMode(fetchMode); config.setLimit(limit); @@ -321,15 +335,15 @@ public class TbGetTelemetryNodeTest { private static Stream givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery() { return Stream.of( Arguments.of( - TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL, + FetchMode.ALL, 5, (Consumer) query -> assertThat(query.getLimit()).isEqualTo(5)), Arguments.of( - TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST, + FetchMode.FIRST, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)), Arguments.of( - TbGetTelemetryNodeConfiguration.FETCH_MODE_LAST, + FetchMode.LAST, 10, (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)) ); @@ -337,7 +351,7 @@ public class TbGetTelemetryNodeTest { @ParameterizedTest @MethodSource - public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(String fetchMode, String orderBy, Consumer verifyOrderInQuery) throws TbNodeException { + public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(FetchMode fetchMode, Direction orderBy, Consumer verifyOrderInQuery) throws TbNodeException { // GIVEN config.setFetchMode(fetchMode); config.setOrderBy(orderBy); @@ -361,20 +375,16 @@ public class TbGetTelemetryNodeTest { private static Stream givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery() { return Stream.of( Arguments.of( - TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL, - "", - (Consumer) query -> assertThat(query.getOrder()).isEqualTo("ASC")), - Arguments.of( - TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL, - "DESC", + FetchMode.ALL, + Direction.DESC, (Consumer) query -> assertThat(query.getOrder()).isEqualTo("DESC")), Arguments.of( - TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST, - "ASC", + FetchMode.FIRST, + Direction.ASC, (Consumer) query -> assertThat(query.getOrder()).isEqualTo("ASC")), Arguments.of( - TbGetTelemetryNodeConfiguration.FETCH_MODE_LAST, - "ASC", + FetchMode.LAST, + Direction.ASC, (Consumer) query -> assertThat(query.getOrder()).isEqualTo("DESC")) ); } @@ -403,7 +413,7 @@ public class TbGetTelemetryNodeTest { public void givenFetchModeAll_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException { // GIVEN config.setLatestTsKeyNames(List.of("temperature", "humidity")); - config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL); + config.setFetchMode(FetchMode.ALL); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); @@ -434,7 +444,7 @@ public class TbGetTelemetryNodeTest { @ValueSource(strings = {"FIRST", "LAST"}) public void givenFetchMode_whenOnMsg_thenTellSuccessAndVerifyMsg(String fetchMode) throws TbNodeException { // GIVEN - config.setFetchMode(fetchMode); + config.setFetchMode(FetchMode.valueOf(fetchMode)); config.setLatestTsKeyNames(List.of("temperature", "humidity")); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); @@ -467,4 +477,87 @@ public class TbGetTelemetryNodeTest { given(ctxMock.getDbCallbackExecutor()).willReturn(executor); } + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // config for version 0 (orderBy id null) + Arguments.of(0, + """ + { + "latestTsKeyNames": [ + ], + "aggregation": "NONE", + "fetchMode": "ALL", + "orderBy": null, + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [ + ], + "aggregation": "NONE", + "fetchMode": "ALL", + "orderBy": "ASC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """), + // config for version 0 (orderBy is specified) + Arguments.of(0, + """ + { + "latestTsKeyNames": [ + ], + "aggregation": "NONE", + "fetchMode": "ALL", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + false, + """ + { + "latestTsKeyNames": [ + ], + "aggregation": "NONE", + "fetchMode": "ALL", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """) + ); + } + + @Override + protected TbNode getTestNode() { + return node; + } } From b149957e439ec4cf0c1d02a96c2888e5233f15c9 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 15 Aug 2024 16:23:26 +0300 Subject: [PATCH 5/6] added checks on init() and provided upgrade for existing nodes --- .../engine/metadata/TbGetTelemetryNode.java | 92 +++++--- .../TbGetTelemetryNodeConfiguration.java | 4 +- .../metadata/TbGetTelemetryNodeTest.java | 215 ++++++++++++------ 3 files changed, 212 insertions(+), 99 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 5faf0e40e2..2b5190563d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -31,7 +31,6 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -75,27 +74,43 @@ public class TbGetTelemetryNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbGetTelemetryNodeConfiguration.class); tsKeyNames = config.getLatestTsKeyNames(); if (tsKeyNames.isEmpty()) { - throw new TbNodeException("Telemetry is not selected!", true); + throw new TbNodeException("Telemetry should be specified!", true); } - if (config.getFetchMode() == null) { + fetchMode = config.getFetchMode(); + if (fetchMode == null) { throw new TbNodeException("FetchMode should be specified!", true); } - limit = FetchMode.ALL.equals(config.getFetchMode()) ? validateLimit(config.getLimit()) : 1; - fetchMode = config.getFetchMode(); - orderBy = getOrderBy(); - aggregation = parseAggregationConfig(config.getAggregation()); - } - - Aggregation parseAggregationConfig(String aggName) { - if (StringUtils.isEmpty(aggName) || !fetchMode.equals(FetchMode.ALL)) { - return Aggregation.NONE; + switch (fetchMode) { + case ALL: + limit = validateLimit(config.getLimit()); + if (config.getOrderBy() == null) { + throw new TbNodeException("OrderBy should be specified!", true); + } + orderBy = config.getOrderBy(); + if (config.getAggregation() == null) { + throw new TbNodeException("Aggregation should be specified!", true); + } + aggregation = config.getAggregation(); + break; + case FIRST: + limit = 1; + orderBy = Direction.ASC; + aggregation = Aggregation.NONE; + break; + case LAST: + limit = 1; + orderBy = Direction.DESC; + aggregation = Aggregation.NONE; + break; } - return Aggregation.valueOf(aggName); } @Override public void onMsg(TbContext ctx, TbMsg msg) { Interval interval = getInterval(msg); + if (interval.getStartTs() > interval.getEndTs()) { + throw new RuntimeException("Interval start should be less than Interval end"); + } List keys = TbNodeUtils.processPatterns(tsKeyNames, msg); ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys)); DonAsynchron.withCallback(list, data -> { @@ -115,22 +130,6 @@ public class TbGetTelemetryNode implements TbNode { .collect(Collectors.toList()); } - private Direction getOrderBy() throws TbNodeException { - switch (fetchMode) { - case ALL: - if (config.getOrderBy() == null) { - throw new TbNodeException("OrderBy should be specified!", true); - } - return config.getOrderBy(); - case FIRST: - return Direction.ASC; - case LAST: - return Direction.DESC; - default: - throw new TbNodeException("FetchMode '" + fetchMode + "' is not supported.", true); - } - } - private TbMsgMetaData updateMetadata(List entries, TbMsg msg, List keys) { ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); if (FetchMode.ALL.equals(fetchMode)) { @@ -234,9 +233,38 @@ public class TbGetTelemetryNode implements TbNode { boolean hasChanges = false; switch (fromVersion) { case 0 -> { - if (oldConfiguration.has("orderBy") && oldConfiguration.get("orderBy").isNull()) { - ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name()); - hasChanges = true; + if (oldConfiguration.hasNonNull("fetchMode")) { + String fetchMode = oldConfiguration.get("fetchMode").asText(); + switch (fetchMode) { + case "FIRST": + ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name()); + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + break; + case "LAST": + ((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name()); + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + break; + case "ALL": + if (oldConfiguration.has("orderBy") && + (oldConfiguration.get("orderBy").isNull() || oldConfiguration.get("orderBy").asText().isEmpty())) { + ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name()); + hasChanges = true; + } + if (oldConfiguration.has("aggregation") && + (oldConfiguration.get("aggregation").isNull() || oldConfiguration.get("aggregation").asText().isEmpty())) { + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + } + break; + default: + ((ObjectNode) oldConfiguration).put("fetchMode", FetchMode.LAST.name()); + ((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name()); + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + break; + } } } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java index 3bb9ff1013..704abeddf6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java @@ -44,7 +44,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration latestTsKeyNames; @@ -62,7 +62,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration node.parseAggregationConfig(" ")).isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void givenAggregationIncorrect_whenParseAggregation_thenException() throws TbNodeException { - // GIVEN - config.setFetchMode(FetchMode.ALL); - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - // WHEN-THEN - assertThatThrownBy(() -> node.parseAggregationConfig("TOP")).isInstanceOf(IllegalArgumentException.class); - } - @Test public void verifyDefaultConfig() { config = new TbGetTelemetryNodeConfiguration().defaultConfiguration(); @@ -144,7 +101,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { assertThat(config.getEndIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name()); assertThat(config.getFetchMode()).isEqualTo(FetchMode.FIRST); assertThat(config.getOrderBy()).isEqualTo(Direction.ASC); - assertThat(config.getAggregation()).isEqualTo(Aggregation.NONE.name()); + assertThat(config.getAggregation()).isEqualTo(Aggregation.NONE); assertThat(config.getLimit()).isEqualTo(1000); assertThat(config.getLatestTsKeyNames()).isEmpty(); } @@ -157,7 +114,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { // WHEN-THEN assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) .isInstanceOf(TbNodeException.class) - .hasMessage("Telemetry is not selected!") + .hasMessage("Telemetry should be specified!") .extracting(e -> ((TbNodeException) e).isUnrecoverable()) .isEqualTo(true); } @@ -181,7 +138,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { config.setFetchMode(FetchMode.ALL); config.setOrderBy(null); - // THEN + // WHEN-THEN assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) .isInstanceOf(TbNodeException.class) .hasMessage("OrderBy should be specified!") @@ -196,7 +153,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { config.setFetchMode(FetchMode.ALL); config.setLimit(limit); - // THEN + // WHEN-THEN assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) .isInstanceOf(TbNodeException.class) .hasMessage("Limit should be in a range from 2 to 1000.") @@ -204,6 +161,33 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { .isEqualTo(true); } + @Test + public void givenFetchModeIsAllAndAggregationIsNull_whenInit_thenThrowsException() { + // GIVEN + config.setFetchMode(FetchMode.ALL); + config.setAggregation(null); + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("Aggregation should be specified!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + + @Test + public void givenIntervalStartIsGreaterThanIntervalEnd_whenOnMsg_thenThrowsException() throws TbNodeException { + // GIVEN + config.setStartInterval(1); + config.setEndInterval(2); + + // WHEN-THEN + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Interval start should be less than Interval end"); + } + @Test public void givenUseMetadataIntervalPatternsIsTrue_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException { // GIVEN @@ -279,7 +263,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { @ParameterizedTest @MethodSource - public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(String aggregation, Consumer verifyAggregationStepInQuery) throws TbNodeException { + public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(Aggregation aggregation, Consumer verifyAggregationStepInQuery) throws TbNodeException { // GIVEN config.setStartInterval(5); config.setEndInterval(1); @@ -304,8 +288,8 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { private static Stream givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery() { return Stream.of( - Arguments.of("", (Consumer) query -> assertThat(query.getInterval()).isEqualTo(1)), - Arguments.of("MIN", (Consumer) query -> assertThat(query.getInterval()).isEqualTo(query.getEndTs() - query.getStartTs())) + Arguments.of(Aggregation.NONE, (Consumer) query -> assertThat(query.getInterval()).isEqualTo(1)), + Arguments.of(Aggregation.AVG, (Consumer) query -> assertThat(query.getInterval()).isEqualTo(query.getEndTs() - query.getStartTs())) ); } @@ -479,15 +463,14 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { return Stream.of( - // config for version 0 (orderBy id null) + // config for version 0 (fetchMode is 'FIRST' and orderBy is 'INVALID_ORDER_BY' and aggregation is 'SUM') Arguments.of(0, """ { - "latestTsKeyNames": [ - ], - "aggregation": "NONE", - "fetchMode": "ALL", - "orderBy": null, + "latestTsKeyNames": [], + "aggregation": "SUM", + "fetchMode": "FIRST", + "orderBy": "INVALID_ORDER_BY", "limit": 1000, "useMetadataIntervalPatterns": false, "startIntervalPattern": "", @@ -501,8 +484,77 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { true, """ { - "latestTsKeyNames": [ - ], + "latestTsKeyNames": [], + "aggregation": "NONE", + "fetchMode": "FIRST", + "orderBy": "ASC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """), + // config for version 0 (fetchMode is 'LAST' and orderBy is 'ASC' and aggregation is 'AVG') + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": "AVG", + "fetchMode": "LAST", + "orderBy": "ASC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [], + "aggregation": "NONE", + "fetchMode": "LAST", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """), + // config for version 0 (fetchMode is 'ALL' and orderBy is empty and aggregation is null) + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": null, + "fetchMode": "ALL", + "orderBy": "", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [], "aggregation": "NONE", "fetchMode": "ALL", "orderBy": "ASC", @@ -516,13 +568,12 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { "endIntervalTimeUnit": "MINUTES" } """), - // config for version 0 (orderBy is specified) + // config for version 0 (fetchMode is 'ALL' and orderBy is 'DESC' and aggregation is 'SUM') Arguments.of(0, """ { - "latestTsKeyNames": [ - ], - "aggregation": "NONE", + "latestTsKeyNames": [], + "aggregation": "SUM", "fetchMode": "ALL", "orderBy": "DESC", "limit": 1000, @@ -538,9 +589,8 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { false, """ { - "latestTsKeyNames": [ - ], - "aggregation": "NONE", + "latestTsKeyNames": [], + "aggregation": "SUM", "fetchMode": "ALL", "orderBy": "DESC", "limit": 1000, @@ -552,6 +602,41 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { "endInterval": 1, "endIntervalTimeUnit": "MINUTES" } + """), + // config for version 0 (fetchMode is 'INVALID_MODE' and orderBy is 'INVALID_ORDER_BY' and aggregation is 'INVALID_AGGREGATION') + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": "INVALID_AGGREGATION", + "fetchMode": "INVALID_MODE", + "orderBy": "INVALID_ORDER_BY", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [], + "aggregation": "NONE", + "fetchMode": "LAST", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } """) ); } From fe571beaa9864c29443c8111d4eaf347300e5a57 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 11 Sep 2024 09:50:00 +0300 Subject: [PATCH 6/6] fixed minor comments in tests --- .../engine/metadata/TbGetTelemetryNodeTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java index c779afe4c5..f2d4a41fbc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java @@ -208,7 +208,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}"); node.onMsg(ctxMock, msg); - /// THEN + // THEN ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); @@ -258,12 +258,12 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); List actualKeys = actualReadTsKvQueryList.getValue().stream().map(TsKvQuery::getKey).toList(); - assertThat(actualKeys).containsAll(List.of("temperature", "humidity", "pressure")); + assertThat(actualKeys).containsExactlyInAnyOrder("temperature", "humidity", "pressure"); } @ParameterizedTest @MethodSource - public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(Aggregation aggregation, Consumer verifyAggregationStepInQuery) throws TbNodeException { + public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(Aggregation aggregation, Consumer aggregationStepVerifier) throws TbNodeException { // GIVEN config.setStartInterval(5); config.setEndInterval(1); @@ -283,7 +283,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); - verifyAggregationStepInQuery.accept(actualReadTsKvQuery); + aggregationStepVerifier.accept(actualReadTsKvQuery); } private static Stream givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery() { @@ -295,7 +295,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { @ParameterizedTest @MethodSource - public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(FetchMode fetchMode, int limit, Consumer verifyLimitInQuery) throws TbNodeException { + public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(FetchMode fetchMode, int limit, Consumer limitInQueryVerifier) throws TbNodeException { // GIVEN config.setFetchMode(fetchMode); config.setLimit(limit); @@ -313,7 +313,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); - verifyLimitInQuery.accept(actualReadTsKvQuery); + limitInQueryVerifier.accept(actualReadTsKvQuery); } private static Stream givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery() { @@ -335,7 +335,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { @ParameterizedTest @MethodSource - public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(FetchMode fetchMode, Direction orderBy, Consumer verifyOrderInQuery) throws TbNodeException { + public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(FetchMode fetchMode, Direction orderBy, Consumer orderInQueryVerifier) throws TbNodeException { // GIVEN config.setFetchMode(fetchMode); config.setOrderBy(orderBy); @@ -353,7 +353,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); - verifyOrderInQuery.accept(actualReadTsKvQuery); + orderInQueryVerifier.accept(actualReadTsKvQuery); } private static Stream givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery() {