diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/FetchMode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/FetchMode.java new file mode 100644 index 0000000000..57fee39a3f --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/FetchMode.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +public enum FetchMode { + + FIRST, ALL, LAST + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index bcf6b1b05a..2b5190563d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.metadata; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.ListenableFuture; @@ -30,17 +31,17 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.page.SortOrder.Direction; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -51,6 +52,7 @@ import java.util.stream.Collectors; @RuleNode(type = ComponentType.ENRICHMENT, name = "originator telemetry", configClazz = TbGetTelemetryNodeConfiguration.class, + version = 1, nodeDescription = "Adds message originator telemetry for selected time range into message metadata", nodeDetails = "Useful when you need to get telemetry data set from the message originator for a specific time range " + "instead of fetching just the latest telemetry or if you need to get the closest telemetry to the fetch interval start or end. " + @@ -60,53 +62,61 @@ import java.util.stream.Collectors; configDirective = "tbEnrichmentNodeGetTelemetryFromDatabase") public class TbGetTelemetryNode implements TbNode { - private static final String DESC_ORDER = "DESC"; - private static final String ASC_ORDER = "ASC"; - private TbGetTelemetryNodeConfiguration config; private List tsKeyNames; private int limit; - private String fetchMode; - private String orderByFetchAll; + private FetchMode fetchMode; + private Direction orderBy; private Aggregation aggregation; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbGetTelemetryNodeConfiguration.class); tsKeyNames = config.getLatestTsKeyNames(); - limit = config.getFetchMode().equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL) ? validateLimit(config.getLimit()) : 1; + if (tsKeyNames.isEmpty()) { + throw new TbNodeException("Telemetry should be specified!", true); + } fetchMode = config.getFetchMode(); - orderByFetchAll = config.getOrderBy(); - if (StringUtils.isEmpty(orderByFetchAll)) { - orderByFetchAll = ASC_ORDER; + if (fetchMode == null) { + throw new TbNodeException("FetchMode should be specified!", true); } - aggregation = parseAggregationConfig(config.getAggregation()); - } - - Aggregation parseAggregationConfig(String aggName) { - if (StringUtils.isEmpty(aggName) || !fetchMode.equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL)) { - return Aggregation.NONE; + switch (fetchMode) { + case ALL: + limit = validateLimit(config.getLimit()); + if (config.getOrderBy() == null) { + throw new TbNodeException("OrderBy should be specified!", true); + } + orderBy = config.getOrderBy(); + if (config.getAggregation() == null) { + throw new TbNodeException("Aggregation should be specified!", true); + } + aggregation = config.getAggregation(); + break; + case FIRST: + limit = 1; + orderBy = Direction.ASC; + aggregation = Aggregation.NONE; + break; + case LAST: + limit = 1; + orderBy = Direction.DESC; + aggregation = Aggregation.NONE; + break; } - return Aggregation.valueOf(aggName); } @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - if (tsKeyNames.isEmpty()) { - ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!")); - } else { - try { - Interval interval = getInterval(msg); - List keys = TbNodeUtils.processPatterns(tsKeyNames, msg); - ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys)); - DonAsynchron.withCallback(list, data -> { - var metaData = updateMetadata(data, msg, keys); - ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData)); - }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); - } catch (Exception e) { - ctx.tellFailure(msg, e); - } + public void onMsg(TbContext ctx, TbMsg msg) { + Interval interval = getInterval(msg); + if (interval.getStartTs() > interval.getEndTs()) { + throw new RuntimeException("Interval start should be less than Interval end"); } + List keys = TbNodeUtils.processPatterns(tsKeyNames, msg); + ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys)); + DonAsynchron.withCallback(list, data -> { + var metaData = updateMetadata(data, msg, keys); + ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData)); + }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); } private List buildQueries(Interval interval, List keys) { @@ -116,24 +126,13 @@ public class TbGetTelemetryNode implements TbNode { interval.getEndTs() - interval.getStartTs(); return keys.stream() - .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, getOrderBy())) + .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, orderBy.name())) .collect(Collectors.toList()); } - private String getOrderBy() { - switch (fetchMode) { - case TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL: - return orderByFetchAll; - case TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST: - return ASC_ORDER; - default: - return DESC_ORDER; - } - } - private TbMsgMetaData updateMetadata(List entries, TbMsg msg, List keys) { ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); - if (TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL.equals(fetchMode)) { + if (FetchMode.ALL.equals(fetchMode)) { entries.forEach(entry -> processArray(resultNode, entry)); } else { entries.forEach(entry -> processSingle(resultNode, entry)); @@ -174,7 +173,7 @@ public class TbGetTelemetryNode implements TbNode { return getIntervalFromPatterns(msg); } else { Interval interval = new Interval(); - long ts = System.currentTimeMillis(); + long ts = getCurrentTimeMillis(); interval.setStartTs(ts - TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval())); interval.setEndTs(ts - TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval())); return interval; @@ -211,12 +210,15 @@ public class TbGetTelemetryNode implements TbNode { return pattern.replaceAll("[$\\[{}\\]]", ""); } - private int validateLimit(int limit) { - if (limit != 0) { - return limit; - } else { - return TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE; + private int validateLimit(int limit) throws TbNodeException { + if (limit < 2 || limit > TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE) { + throw new TbNodeException("Limit should be in a range from 2 to 1000.", true); } + return limit; + } + + long getCurrentTimeMillis() { + return System.currentTimeMillis(); } @Data @@ -226,4 +228,47 @@ public class TbGetTelemetryNode implements TbNode { private Long endTs; } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0 -> { + if (oldConfiguration.hasNonNull("fetchMode")) { + String fetchMode = oldConfiguration.get("fetchMode").asText(); + switch (fetchMode) { + case "FIRST": + ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name()); + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + break; + case "LAST": + ((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name()); + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + break; + case "ALL": + if (oldConfiguration.has("orderBy") && + (oldConfiguration.get("orderBy").isNull() || oldConfiguration.get("orderBy").asText().isEmpty())) { + ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name()); + hasChanges = true; + } + if (oldConfiguration.has("aggregation") && + (oldConfiguration.get("aggregation").isNull() || oldConfiguration.get("aggregation").asText().isEmpty())) { + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + } + break; + default: + ((ObjectNode) oldConfiguration).put("fetchMode", FetchMode.LAST.name()); + ((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name()); + ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name()); + hasChanges = true; + break; + } + } + } + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java index c4128a4481..704abeddf6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.metadata; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.page.SortOrder.Direction; import java.util.Collections; import java.util.List; @@ -29,10 +30,6 @@ import java.util.concurrent.TimeUnit; @Data public class TbGetTelemetryNodeConfiguration implements NodeConfiguration { - public static final String FETCH_MODE_FIRST = "FIRST"; - public static final String FETCH_MODE_LAST = "LAST"; - public static final String FETCH_MODE_ALL = "ALL"; - public static final int MAX_FETCH_SIZE = 1000; private int startInterval; @@ -45,9 +42,9 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration latestTsKeyNames; @@ -56,7 +53,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration { - node.parseAggregationConfig(" "); - }); + public void givenEmptyTsKeyNames_whenInit_thenThrowsException() { + // GIVEN + config.setLatestTsKeyNames(Collections.emptyList()); + + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("Telemetry should be specified!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); } @Test - public void givenAggregationIncorrect_whenParseAggregation_thenException() { - Assertions.assertThrows(IllegalArgumentException.class, () -> { - node.parseAggregationConfig("TOP"); - }); + public void givenFetchModeIsNull_whenInit_thenThrowsException() { + // GIVEN + config.setFetchMode(null); + + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("FetchMode should be specified!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); } + @Test + public void givenFetchModeAllAndOrderByIsNull_whenInit_thenThrowsException() { + // GIVEN + config.setFetchMode(FetchMode.ALL); + config.setOrderBy(null); + + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("OrderBy should be specified!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + + @ParameterizedTest + @ValueSource(ints = {-1, 0, 1, 1001, 2000}) + public void givenFetchModeAllAndLimitIsOutOfRange_whenInit_thenThrowsException(int limit) { + // GIVEN + config.setFetchMode(FetchMode.ALL); + config.setLimit(limit); + + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("Limit should be in a range from 2 to 1000.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + + @Test + public void givenFetchModeIsAllAndAggregationIsNull_whenInit_thenThrowsException() { + // GIVEN + config.setFetchMode(FetchMode.ALL); + config.setAggregation(null); + // WHEN-THEN + assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("Aggregation should be specified!") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + + @Test + public void givenIntervalStartIsGreaterThanIntervalEnd_whenOnMsg_thenThrowsException() throws TbNodeException { + // GIVEN + config.setStartInterval(1); + config.setEndInterval(2); + + // WHEN-THEN + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Interval start should be less than Interval end"); + } + + @Test + public void givenUseMetadataIntervalPatternsIsTrue_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException { + // GIVEN + config.setUseMetadataIntervalPatterns(true); + config.setStartIntervalPattern("${mdStartInterval}"); + config.setEndIntervalPattern("$[msgEndInterval]"); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + long startTs = 1719220350000L; + long endTs = 1719220353000L; + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("mdStartInterval", String.valueOf(startTs)); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}"); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(startTs); + assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(endTs); + } + + @Test + public void givenUseMetadataIntervalPatternsIsFalse_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException { + // GIVEN + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + long ts = System.currentTimeMillis(); + willReturn(ts).given(node).getCurrentTimeMillis(); + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getStartInterval())); + assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getEndInterval())); + } + + @Test + public void givenTsKeyNamesPatterns_whenOnMsg_thenVerifyTsKeyNamesInQuery() throws TbNodeException { + // GIVEN + config.setLatestTsKeyNames(List.of("temperature", "${mdTsKey}", "$[msgTsKey]")); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("mdTsKey", "humidity"); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgTsKey\":\"pressure\"}"); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + List actualKeys = actualReadTsKvQueryList.getValue().stream().map(TsKvQuery::getKey).toList(); + assertThat(actualKeys).containsExactlyInAnyOrder("temperature", "humidity", "pressure"); + } + + @ParameterizedTest + @MethodSource + public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(Aggregation aggregation, Consumer aggregationStepVerifier) throws TbNodeException { + // GIVEN + config.setStartInterval(5); + config.setEndInterval(1); + config.setFetchMode(FetchMode.ALL); + config.setAggregation(aggregation); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + aggregationStepVerifier.accept(actualReadTsKvQuery); + } + + private static Stream givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery() { + return Stream.of( + Arguments.of(Aggregation.NONE, (Consumer) query -> assertThat(query.getInterval()).isEqualTo(1)), + Arguments.of(Aggregation.AVG, (Consumer) query -> assertThat(query.getInterval()).isEqualTo(query.getEndTs() - query.getStartTs())) + ); + } + + @ParameterizedTest + @MethodSource + public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(FetchMode fetchMode, int limit, Consumer limitInQueryVerifier) throws TbNodeException { + // GIVEN + config.setFetchMode(fetchMode); + config.setLimit(limit); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + limitInQueryVerifier.accept(actualReadTsKvQuery); + } + + private static Stream givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery() { + return Stream.of( + Arguments.of( + FetchMode.ALL, + 5, + (Consumer) query -> assertThat(query.getLimit()).isEqualTo(5)), + Arguments.of( + FetchMode.FIRST, + TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, + (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)), + Arguments.of( + FetchMode.LAST, + 10, + (Consumer) query -> assertThat(query.getLimit()).isEqualTo(1)) + ); + } + + @ParameterizedTest + @MethodSource + public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(FetchMode fetchMode, Direction orderBy, Consumer orderInQueryVerifier) throws TbNodeException { + // GIVEN + config.setFetchMode(fetchMode); + config.setOrderBy(orderBy); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList())); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class); + then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture()); + ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0); + orderInQueryVerifier.accept(actualReadTsKvQuery); + } + + private static Stream givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery() { + return Stream.of( + Arguments.of( + FetchMode.ALL, + Direction.DESC, + (Consumer) query -> assertThat(query.getOrder()).isEqualTo("DESC")), + Arguments.of( + FetchMode.FIRST, + Direction.ASC, + (Consumer) query -> assertThat(query.getOrder()).isEqualTo("ASC")), + Arguments.of( + FetchMode.LAST, + Direction.ASC, + (Consumer) query -> assertThat(query.getOrder()).isEqualTo("DESC")) + ); + } + + @ParameterizedTest + @MethodSource + public void givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException(String startIntervalPattern, String errorMsg) throws TbNodeException { + // GIVEN + config.setUseMetadataIntervalPatterns(true); + config.setStartIntervalPattern(startIntervalPattern); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + // WHEN-THEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msgStartInterval\":\"start\"}"); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)).isInstanceOf(IllegalArgumentException.class).hasMessage(errorMsg); + } + + private static Stream givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException() { + return Stream.of( + Arguments.of("${mdStartInterval}", "Message value: 'mdStartInterval' is undefined"), + Arguments.of("$[msgStartInterval]", "Message value: 'msgStartInterval' has invalid format") + ); + } + + @Test + public void givenFetchModeAll_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException { + // GIVEN + config.setLatestTsKeyNames(List.of("temperature", "humidity")); + config.setFetchMode(FetchMode.ALL); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + long ts = System.currentTimeMillis(); + List tsKvEntries = List.of( + new BasicTsKvEntry(ts - 5, new DoubleDataEntry("temperature", 23.1)), + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)), + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5)) + ); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellSuccess(actualMsg.capture()); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temperature", "[{\"ts\":" + (ts - 5) + ",\"value\":23.1},{\"ts\":" + (ts - 4) + ",\"value\":22.4}]"); + metaData.putValue("humidity", "[{\"ts\":" + (ts - 4) + ",\"value\":55.5}]"); + TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); + } + + @ParameterizedTest + @ValueSource(strings = {"FIRST", "LAST"}) + public void givenFetchMode_whenOnMsg_thenTellSuccessAndVerifyMsg(String fetchMode) throws TbNodeException { + // GIVEN + config.setFetchMode(FetchMode.valueOf(fetchMode)); + config.setLatestTsKeyNames(List.of("temperature", "humidity")); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + mockTimeseriesService(); + long ts = System.currentTimeMillis(); + List tsKvEntries = List.of( + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)), + new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5)) + ); + given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries)); + + // WHEN + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + // THEN + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellSuccess(actualMsg.capture()); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("temperature", "\"22.4\""); + metaData.putValue("humidity", "\"55.5\""); + TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); + } + + private void mockTimeseriesService() { + given(ctxMock.getTimeseriesService()).willReturn(timeseriesServiceMock); + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + given(ctxMock.getDbCallbackExecutor()).willReturn(executor); + } + + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // config for version 0 (fetchMode is 'FIRST' and orderBy is 'INVALID_ORDER_BY' and aggregation is 'SUM') + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": "SUM", + "fetchMode": "FIRST", + "orderBy": "INVALID_ORDER_BY", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [], + "aggregation": "NONE", + "fetchMode": "FIRST", + "orderBy": "ASC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """), + // config for version 0 (fetchMode is 'LAST' and orderBy is 'ASC' and aggregation is 'AVG') + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": "AVG", + "fetchMode": "LAST", + "orderBy": "ASC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [], + "aggregation": "NONE", + "fetchMode": "LAST", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """), + // config for version 0 (fetchMode is 'ALL' and orderBy is empty and aggregation is null) + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": null, + "fetchMode": "ALL", + "orderBy": "", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [], + "aggregation": "NONE", + "fetchMode": "ALL", + "orderBy": "ASC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """), + // config for version 0 (fetchMode is 'ALL' and orderBy is 'DESC' and aggregation is 'SUM') + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": "SUM", + "fetchMode": "ALL", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + false, + """ + { + "latestTsKeyNames": [], + "aggregation": "SUM", + "fetchMode": "ALL", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """), + // config for version 0 (fetchMode is 'INVALID_MODE' and orderBy is 'INVALID_ORDER_BY' and aggregation is 'INVALID_AGGREGATION') + Arguments.of(0, + """ + { + "latestTsKeyNames": [], + "aggregation": "INVALID_AGGREGATION", + "fetchMode": "INVALID_MODE", + "orderBy": "INVALID_ORDER_BY", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """, + true, + """ + { + "latestTsKeyNames": [], + "aggregation": "NONE", + "fetchMode": "LAST", + "orderBy": "DESC", + "limit": 1000, + "useMetadataIntervalPatterns": false, + "startIntervalPattern": "", + "endIntervalPattern": "", + "startInterval": 2, + "startIntervalTimeUnit": "MINUTES", + "endInterval": 1, + "endIntervalTimeUnit": "MINUTES" + } + """) + ); + } + + @Override + protected TbNode getTestNode() { + return node; + } }