Merge pull request #11087 from irynamatveieva/improvements/originator-telemetry-node
Originator telemetry node: added tests
This commit is contained in:
		
						commit
						d37208ba4f
					
				@ -0,0 +1,22 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.metadata;
 | 
			
		||||
 | 
			
		||||
public enum FetchMode {
 | 
			
		||||
 | 
			
		||||
    FIRST, ALL, LAST
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.metadata;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ArrayNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
@ -30,17 +31,17 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.Aggregation;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.page.SortOrder.Direction;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.data.util.TbPair;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@ -51,6 +52,7 @@ import java.util.stream.Collectors;
 | 
			
		||||
@RuleNode(type = ComponentType.ENRICHMENT,
 | 
			
		||||
        name = "originator telemetry",
 | 
			
		||||
        configClazz = TbGetTelemetryNodeConfiguration.class,
 | 
			
		||||
        version = 1,
 | 
			
		||||
        nodeDescription = "Adds message originator telemetry for selected time range into message metadata",
 | 
			
		||||
        nodeDetails = "Useful when you need to get telemetry data set from the message originator for a specific time range " +
 | 
			
		||||
                "instead of fetching just the latest telemetry or if you need to get the closest telemetry to the fetch interval start or end. " +
 | 
			
		||||
@ -60,53 +62,61 @@ import java.util.stream.Collectors;
 | 
			
		||||
        configDirective = "tbEnrichmentNodeGetTelemetryFromDatabase")
 | 
			
		||||
public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private static final String DESC_ORDER = "DESC";
 | 
			
		||||
    private static final String ASC_ORDER = "ASC";
 | 
			
		||||
 | 
			
		||||
    private TbGetTelemetryNodeConfiguration config;
 | 
			
		||||
    private List<String> tsKeyNames;
 | 
			
		||||
    private int limit;
 | 
			
		||||
    private String fetchMode;
 | 
			
		||||
    private String orderByFetchAll;
 | 
			
		||||
    private FetchMode fetchMode;
 | 
			
		||||
    private Direction orderBy;
 | 
			
		||||
    private Aggregation aggregation;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbGetTelemetryNodeConfiguration.class);
 | 
			
		||||
        tsKeyNames = config.getLatestTsKeyNames();
 | 
			
		||||
        limit = config.getFetchMode().equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL) ? validateLimit(config.getLimit()) : 1;
 | 
			
		||||
        if (tsKeyNames.isEmpty()) {
 | 
			
		||||
            throw new TbNodeException("Telemetry should be specified!", true);
 | 
			
		||||
        }
 | 
			
		||||
        fetchMode = config.getFetchMode();
 | 
			
		||||
        orderByFetchAll = config.getOrderBy();
 | 
			
		||||
        if (StringUtils.isEmpty(orderByFetchAll)) {
 | 
			
		||||
            orderByFetchAll = ASC_ORDER;
 | 
			
		||||
        if (fetchMode == null) {
 | 
			
		||||
            throw new TbNodeException("FetchMode should be specified!", true);
 | 
			
		||||
        }
 | 
			
		||||
        aggregation = parseAggregationConfig(config.getAggregation());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Aggregation parseAggregationConfig(String aggName) {
 | 
			
		||||
        if (StringUtils.isEmpty(aggName) || !fetchMode.equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL)) {
 | 
			
		||||
            return Aggregation.NONE;
 | 
			
		||||
        switch (fetchMode) {
 | 
			
		||||
            case ALL:
 | 
			
		||||
                limit = validateLimit(config.getLimit());
 | 
			
		||||
                if (config.getOrderBy() == null) {
 | 
			
		||||
                    throw new TbNodeException("OrderBy should be specified!", true);
 | 
			
		||||
                }
 | 
			
		||||
                orderBy = config.getOrderBy();
 | 
			
		||||
                if (config.getAggregation() == null) {
 | 
			
		||||
                    throw new TbNodeException("Aggregation should be specified!", true);
 | 
			
		||||
                }
 | 
			
		||||
                aggregation = config.getAggregation();
 | 
			
		||||
                break;
 | 
			
		||||
            case FIRST:
 | 
			
		||||
                limit = 1;
 | 
			
		||||
                orderBy = Direction.ASC;
 | 
			
		||||
                aggregation = Aggregation.NONE;
 | 
			
		||||
                break;
 | 
			
		||||
            case LAST:
 | 
			
		||||
                limit = 1;
 | 
			
		||||
                orderBy = Direction.DESC;
 | 
			
		||||
                aggregation = Aggregation.NONE;
 | 
			
		||||
                break;
 | 
			
		||||
        }
 | 
			
		||||
        return Aggregation.valueOf(aggName);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
 | 
			
		||||
        if (tsKeyNames.isEmpty()) {
 | 
			
		||||
            ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!"));
 | 
			
		||||
        } else {
 | 
			
		||||
            try {
 | 
			
		||||
                Interval interval = getInterval(msg);
 | 
			
		||||
                List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg);
 | 
			
		||||
                ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys));
 | 
			
		||||
                DonAsynchron.withCallback(list, data -> {
 | 
			
		||||
                    var metaData = updateMetadata(data, msg, keys);
 | 
			
		||||
                    ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData));
 | 
			
		||||
                }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                ctx.tellFailure(msg, e);
 | 
			
		||||
            }
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        Interval interval = getInterval(msg);
 | 
			
		||||
        if (interval.getStartTs() > interval.getEndTs()) {
 | 
			
		||||
            throw new RuntimeException("Interval start should be less than Interval end");
 | 
			
		||||
        }
 | 
			
		||||
        List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg);
 | 
			
		||||
        ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys));
 | 
			
		||||
        DonAsynchron.withCallback(list, data -> {
 | 
			
		||||
            var metaData = updateMetadata(data, msg, keys);
 | 
			
		||||
            ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData));
 | 
			
		||||
        }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<ReadTsKvQuery> buildQueries(Interval interval, List<String> keys) {
 | 
			
		||||
@ -116,24 +126,13 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
                interval.getEndTs() - interval.getStartTs();
 | 
			
		||||
 | 
			
		||||
        return keys.stream()
 | 
			
		||||
                .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, getOrderBy()))
 | 
			
		||||
                .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, orderBy.name()))
 | 
			
		||||
                .collect(Collectors.toList());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getOrderBy() {
 | 
			
		||||
        switch (fetchMode) {
 | 
			
		||||
            case TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL:
 | 
			
		||||
                return orderByFetchAll;
 | 
			
		||||
            case TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST:
 | 
			
		||||
                return ASC_ORDER;
 | 
			
		||||
            default:
 | 
			
		||||
                return DESC_ORDER;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsgMetaData updateMetadata(List<TsKvEntry> entries, TbMsg msg, List<String> keys) {
 | 
			
		||||
        ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER);
 | 
			
		||||
        if (TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL.equals(fetchMode)) {
 | 
			
		||||
        if (FetchMode.ALL.equals(fetchMode)) {
 | 
			
		||||
            entries.forEach(entry -> processArray(resultNode, entry));
 | 
			
		||||
        } else {
 | 
			
		||||
            entries.forEach(entry -> processSingle(resultNode, entry));
 | 
			
		||||
@ -174,7 +173,7 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
            return getIntervalFromPatterns(msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            Interval interval = new Interval();
 | 
			
		||||
            long ts = System.currentTimeMillis();
 | 
			
		||||
            long ts = getCurrentTimeMillis();
 | 
			
		||||
            interval.setStartTs(ts - TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval()));
 | 
			
		||||
            interval.setEndTs(ts - TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval()));
 | 
			
		||||
            return interval;
 | 
			
		||||
@ -211,12 +210,15 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
        return pattern.replaceAll("[$\\[{}\\]]", "");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private int validateLimit(int limit) {
 | 
			
		||||
        if (limit != 0) {
 | 
			
		||||
            return limit;
 | 
			
		||||
        } else {
 | 
			
		||||
            return TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE;
 | 
			
		||||
    private int validateLimit(int limit) throws TbNodeException {
 | 
			
		||||
        if (limit < 2 || limit > TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE) {
 | 
			
		||||
            throw new TbNodeException("Limit should be in a range from 2 to 1000.", true);
 | 
			
		||||
        }
 | 
			
		||||
        return limit;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    long getCurrentTimeMillis() {
 | 
			
		||||
        return System.currentTimeMillis();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Data
 | 
			
		||||
@ -226,4 +228,47 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
        private Long endTs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
 | 
			
		||||
        boolean hasChanges = false;
 | 
			
		||||
        switch (fromVersion) {
 | 
			
		||||
            case 0 -> {
 | 
			
		||||
                if (oldConfiguration.hasNonNull("fetchMode")) {
 | 
			
		||||
                    String fetchMode = oldConfiguration.get("fetchMode").asText();
 | 
			
		||||
                    switch (fetchMode) {
 | 
			
		||||
                        case "FIRST":
 | 
			
		||||
                            ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name());
 | 
			
		||||
                            ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
 | 
			
		||||
                            hasChanges = true;
 | 
			
		||||
                            break;
 | 
			
		||||
                        case "LAST":
 | 
			
		||||
                            ((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name());
 | 
			
		||||
                            ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
 | 
			
		||||
                            hasChanges = true;
 | 
			
		||||
                            break;
 | 
			
		||||
                        case "ALL":
 | 
			
		||||
                            if (oldConfiguration.has("orderBy") &&
 | 
			
		||||
                                    (oldConfiguration.get("orderBy").isNull() || oldConfiguration.get("orderBy").asText().isEmpty())) {
 | 
			
		||||
                                ((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name());
 | 
			
		||||
                                hasChanges = true;
 | 
			
		||||
                            }
 | 
			
		||||
                            if (oldConfiguration.has("aggregation") &&
 | 
			
		||||
                                    (oldConfiguration.get("aggregation").isNull() || oldConfiguration.get("aggregation").asText().isEmpty())) {
 | 
			
		||||
                                ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
 | 
			
		||||
                                hasChanges = true;
 | 
			
		||||
                            }
 | 
			
		||||
                            break;
 | 
			
		||||
                        default:
 | 
			
		||||
                            ((ObjectNode) oldConfiguration).put("fetchMode", FetchMode.LAST.name());
 | 
			
		||||
                            ((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name());
 | 
			
		||||
                            ((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
 | 
			
		||||
                            hasChanges = true;
 | 
			
		||||
                            break;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return new TbPair<>(hasChanges, oldConfiguration);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.metadata;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.Aggregation;
 | 
			
		||||
import org.thingsboard.server.common.data.page.SortOrder.Direction;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -29,10 +30,6 @@ import java.util.concurrent.TimeUnit;
 | 
			
		||||
@Data
 | 
			
		||||
public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetTelemetryNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    public static final String FETCH_MODE_FIRST = "FIRST";
 | 
			
		||||
    public static final String FETCH_MODE_LAST = "LAST";
 | 
			
		||||
    public static final String FETCH_MODE_ALL = "ALL";
 | 
			
		||||
 | 
			
		||||
    public static final int MAX_FETCH_SIZE = 1000;
 | 
			
		||||
 | 
			
		||||
    private int startInterval;
 | 
			
		||||
@ -45,9 +42,9 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT
 | 
			
		||||
 | 
			
		||||
    private String startIntervalTimeUnit;
 | 
			
		||||
    private String endIntervalTimeUnit;
 | 
			
		||||
    private String fetchMode; //FIRST, LAST, ALL
 | 
			
		||||
    private String orderBy; //ASC, DESC
 | 
			
		||||
    private String aggregation; //MIN, MAX, AVG, SUM, COUNT, NONE;
 | 
			
		||||
    private FetchMode fetchMode; //FIRST, LAST, ALL
 | 
			
		||||
    private Direction orderBy; //ASC, DESC
 | 
			
		||||
    private Aggregation aggregation; //MIN, MAX, AVG, SUM, COUNT, NONE;
 | 
			
		||||
    private int limit;
 | 
			
		||||
 | 
			
		||||
    private List<String> latestTsKeyNames;
 | 
			
		||||
@ -56,7 +53,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT
 | 
			
		||||
    public TbGetTelemetryNodeConfiguration defaultConfiguration() {
 | 
			
		||||
        TbGetTelemetryNodeConfiguration configuration = new TbGetTelemetryNodeConfiguration();
 | 
			
		||||
        configuration.setLatestTsKeyNames(Collections.emptyList());
 | 
			
		||||
        configuration.setFetchMode("FIRST");
 | 
			
		||||
        configuration.setFetchMode(FetchMode.FIRST);
 | 
			
		||||
        configuration.setStartIntervalTimeUnit(TimeUnit.MINUTES.name());
 | 
			
		||||
        configuration.setStartInterval(2);
 | 
			
		||||
        configuration.setEndIntervalTimeUnit(TimeUnit.MINUTES.name());
 | 
			
		||||
@ -64,8 +61,8 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT
 | 
			
		||||
        configuration.setUseMetadataIntervalPatterns(false);
 | 
			
		||||
        configuration.setStartIntervalPattern("");
 | 
			
		||||
        configuration.setEndIntervalPattern("");
 | 
			
		||||
        configuration.setOrderBy("ASC");
 | 
			
		||||
        configuration.setAggregation(Aggregation.NONE.name());
 | 
			
		||||
        configuration.setOrderBy(Direction.ASC);
 | 
			
		||||
        configuration.setAggregation(Aggregation.NONE);
 | 
			
		||||
        configuration.setLimit(MAX_FETCH_SIZE);
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -15,72 +15,634 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.metadata;
 | 
			
		||||
 | 
			
		||||
import org.junit.jupiter.api.Assertions;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
			
		||||
import org.junit.jupiter.params.ParameterizedTest;
 | 
			
		||||
import org.junit.jupiter.params.provider.Arguments;
 | 
			
		||||
import org.junit.jupiter.params.provider.MethodSource;
 | 
			
		||||
import org.junit.jupiter.params.provider.ValueSource;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoExtension;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ListeningExecutor;
 | 
			
		||||
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
 | 
			
		||||
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.Aggregation;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
			
		||||
import org.thingsboard.server.common.data.page.SortOrder.Direction;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
 | 
			
		||||
import static org.hamcrest.CoreMatchers.is;
 | 
			
		||||
import static org.hamcrest.MatcherAssert.assertThat;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.BDDMockito.willCallRealMethod;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static org.mockito.Mockito.spy;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyList;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.BDDMockito.given;
 | 
			
		||||
import static org.mockito.BDDMockito.spy;
 | 
			
		||||
import static org.mockito.BDDMockito.then;
 | 
			
		||||
import static org.mockito.BDDMockito.willReturn;
 | 
			
		||||
 | 
			
		||||
public class TbGetTelemetryNodeTest {
 | 
			
		||||
@ExtendWith(MockitoExtension.class)
 | 
			
		||||
public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
 | 
			
		||||
    TbGetTelemetryNode node;
 | 
			
		||||
    TbGetTelemetryNodeConfiguration config;
 | 
			
		||||
    TbNodeConfiguration nodeConfiguration;
 | 
			
		||||
    TbContext ctx;
 | 
			
		||||
    private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5738401b-9dba-422b-b656-a62fe7431917"));
 | 
			
		||||
    private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("8a8fd749-b2ec-488b-a6c6-fc66614d8686"));
 | 
			
		||||
 | 
			
		||||
    private final ListeningExecutor executor = new TestDbCallbackExecutor();
 | 
			
		||||
 | 
			
		||||
    private TbGetTelemetryNode node;
 | 
			
		||||
    private TbGetTelemetryNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbContext ctxMock;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TimeseriesService timeseriesServiceMock;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    public void setUp() throws Exception {
 | 
			
		||||
        ctx = mock(TbContext.class);
 | 
			
		||||
    public void setUp() {
 | 
			
		||||
        node = spy(new TbGetTelemetryNode());
 | 
			
		||||
        config = new TbGetTelemetryNodeConfiguration();
 | 
			
		||||
        config.setFetchMode("ALL");
 | 
			
		||||
        nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
 | 
			
		||||
        node.init(ctx, nodeConfiguration);
 | 
			
		||||
 | 
			
		||||
        willCallRealMethod().given(node).parseAggregationConfig(any());
 | 
			
		||||
        config = new TbGetTelemetryNodeConfiguration().defaultConfiguration();
 | 
			
		||||
        config.setLatestTsKeyNames(List.of("temperature"));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenAggregationAsString_whenParseAggregation_thenReturnEnum() {
 | 
			
		||||
        //compatibility with old configs without "aggregation" parameter
 | 
			
		||||
        assertThat(node.parseAggregationConfig(null), is(Aggregation.NONE));
 | 
			
		||||
        assertThat(node.parseAggregationConfig(""), is(Aggregation.NONE));
 | 
			
		||||
 | 
			
		||||
        //common values
 | 
			
		||||
        assertThat(node.parseAggregationConfig("MIN"), is(Aggregation.MIN));
 | 
			
		||||
        assertThat(node.parseAggregationConfig("MAX"), is(Aggregation.MAX));
 | 
			
		||||
        assertThat(node.parseAggregationConfig("AVG"), is(Aggregation.AVG));
 | 
			
		||||
        assertThat(node.parseAggregationConfig("SUM"), is(Aggregation.SUM));
 | 
			
		||||
        assertThat(node.parseAggregationConfig("COUNT"), is(Aggregation.COUNT));
 | 
			
		||||
        assertThat(node.parseAggregationConfig("NONE"), is(Aggregation.NONE));
 | 
			
		||||
 | 
			
		||||
        //all possible values in future
 | 
			
		||||
        for (Aggregation aggEnum : Aggregation.values()) {
 | 
			
		||||
            assertThat(node.parseAggregationConfig(aggEnum.name()), is(aggEnum));
 | 
			
		||||
        }
 | 
			
		||||
    public void verifyDefaultConfig() {
 | 
			
		||||
        config = new TbGetTelemetryNodeConfiguration().defaultConfiguration();
 | 
			
		||||
        assertThat(config.getStartInterval()).isEqualTo(2);
 | 
			
		||||
        assertThat(config.getEndInterval()).isEqualTo(1);
 | 
			
		||||
        assertThat(config.getStartIntervalPattern()).isEqualTo("");
 | 
			
		||||
        assertThat(config.getEndIntervalPattern()).isEqualTo("");
 | 
			
		||||
        assertThat(config.isUseMetadataIntervalPatterns()).isFalse();
 | 
			
		||||
        assertThat(config.getStartIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name());
 | 
			
		||||
        assertThat(config.getEndIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name());
 | 
			
		||||
        assertThat(config.getFetchMode()).isEqualTo(FetchMode.FIRST);
 | 
			
		||||
        assertThat(config.getOrderBy()).isEqualTo(Direction.ASC);
 | 
			
		||||
        assertThat(config.getAggregation()).isEqualTo(Aggregation.NONE);
 | 
			
		||||
        assertThat(config.getLimit()).isEqualTo(1000);
 | 
			
		||||
        assertThat(config.getLatestTsKeyNames()).isEmpty();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenAggregationWhiteSpace_whenParseAggregation_thenException() {
 | 
			
		||||
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
 | 
			
		||||
            node.parseAggregationConfig(" ");
 | 
			
		||||
        });
 | 
			
		||||
    public void givenEmptyTsKeyNames_whenInit_thenThrowsException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setLatestTsKeyNames(Collections.emptyList());
 | 
			
		||||
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
 | 
			
		||||
                .isInstanceOf(TbNodeException.class)
 | 
			
		||||
                .hasMessage("Telemetry should be specified!")
 | 
			
		||||
                .extracting(e -> ((TbNodeException) e).isUnrecoverable())
 | 
			
		||||
                .isEqualTo(true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenAggregationIncorrect_whenParseAggregation_thenException() {
 | 
			
		||||
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
 | 
			
		||||
            node.parseAggregationConfig("TOP");
 | 
			
		||||
        });
 | 
			
		||||
    public void givenFetchModeIsNull_whenInit_thenThrowsException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setFetchMode(null);
 | 
			
		||||
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
 | 
			
		||||
                .isInstanceOf(TbNodeException.class)
 | 
			
		||||
                .hasMessage("FetchMode should be specified!")
 | 
			
		||||
                .extracting(e -> ((TbNodeException) e).isUnrecoverable())
 | 
			
		||||
                .isEqualTo(true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenFetchModeAllAndOrderByIsNull_whenInit_thenThrowsException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setFetchMode(FetchMode.ALL);
 | 
			
		||||
        config.setOrderBy(null);
 | 
			
		||||
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
 | 
			
		||||
                .isInstanceOf(TbNodeException.class)
 | 
			
		||||
                .hasMessage("OrderBy should be specified!")
 | 
			
		||||
                .extracting(e -> ((TbNodeException) e).isUnrecoverable())
 | 
			
		||||
                .isEqualTo(true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @ValueSource(ints = {-1, 0, 1, 1001, 2000})
 | 
			
		||||
    public void givenFetchModeAllAndLimitIsOutOfRange_whenInit_thenThrowsException(int limit) {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setFetchMode(FetchMode.ALL);
 | 
			
		||||
        config.setLimit(limit);
 | 
			
		||||
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
 | 
			
		||||
                .isInstanceOf(TbNodeException.class)
 | 
			
		||||
                .hasMessage("Limit should be in a range from 2 to 1000.")
 | 
			
		||||
                .extracting(e -> ((TbNodeException) e).isUnrecoverable())
 | 
			
		||||
                .isEqualTo(true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenFetchModeIsAllAndAggregationIsNull_whenInit_thenThrowsException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setFetchMode(FetchMode.ALL);
 | 
			
		||||
        config.setAggregation(null);
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
 | 
			
		||||
                .isInstanceOf(TbNodeException.class)
 | 
			
		||||
                .hasMessage("Aggregation should be specified!")
 | 
			
		||||
                .extracting(e -> ((TbNodeException) e).isUnrecoverable())
 | 
			
		||||
                .isEqualTo(true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenIntervalStartIsGreaterThanIntervalEnd_whenOnMsg_thenThrowsException() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setStartInterval(1);
 | 
			
		||||
        config.setEndInterval(2);
 | 
			
		||||
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
 | 
			
		||||
                .isInstanceOf(RuntimeException.class)
 | 
			
		||||
                .hasMessage("Interval start should be less than Interval end");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenUseMetadataIntervalPatternsIsTrue_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setUseMetadataIntervalPatterns(true);
 | 
			
		||||
        config.setStartIntervalPattern("${mdStartInterval}");
 | 
			
		||||
        config.setEndIntervalPattern("$[msgEndInterval]");
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        long startTs = 1719220350000L;
 | 
			
		||||
        long endTs = 1719220353000L;
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("mdStartInterval", String.valueOf(startTs));
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}");
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
 | 
			
		||||
        then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
 | 
			
		||||
        ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
 | 
			
		||||
        assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(startTs);
 | 
			
		||||
        assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(endTs);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenUseMetadataIntervalPatternsIsFalse_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
        willReturn(ts).given(node).getCurrentTimeMillis();
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
 | 
			
		||||
        then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
 | 
			
		||||
        ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
 | 
			
		||||
        assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getStartInterval()));
 | 
			
		||||
        assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(ts - TimeUnit.MINUTES.toMillis(config.getEndInterval()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenTsKeyNamesPatterns_whenOnMsg_thenVerifyTsKeyNamesInQuery() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setLatestTsKeyNames(List.of("temperature", "${mdTsKey}", "$[msgTsKey]"));
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("mdTsKey", "humidity");
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgTsKey\":\"pressure\"}");
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
 | 
			
		||||
        then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
 | 
			
		||||
        List<String> actualKeys = actualReadTsKvQueryList.getValue().stream().map(TsKvQuery::getKey).toList();
 | 
			
		||||
        assertThat(actualKeys).containsExactlyInAnyOrder("temperature", "humidity", "pressure");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(Aggregation aggregation, Consumer<ReadTsKvQuery> aggregationStepVerifier) throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setStartInterval(5);
 | 
			
		||||
        config.setEndInterval(1);
 | 
			
		||||
        config.setFetchMode(FetchMode.ALL);
 | 
			
		||||
        config.setAggregation(aggregation);
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
 | 
			
		||||
        then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
 | 
			
		||||
        ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
 | 
			
		||||
        aggregationStepVerifier.accept(actualReadTsKvQuery);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of(Aggregation.NONE, (Consumer<ReadTsKvQuery>) query -> assertThat(query.getInterval()).isEqualTo(1)),
 | 
			
		||||
                Arguments.of(Aggregation.AVG, (Consumer<ReadTsKvQuery>) query -> assertThat(query.getInterval()).isEqualTo(query.getEndTs() - query.getStartTs()))
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(FetchMode fetchMode, int limit, Consumer<ReadTsKvQuery> limitInQueryVerifier) throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setFetchMode(fetchMode);
 | 
			
		||||
        config.setLimit(limit);
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
 | 
			
		||||
        then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
 | 
			
		||||
        ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
 | 
			
		||||
        limitInQueryVerifier.accept(actualReadTsKvQuery);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of(
 | 
			
		||||
                        FetchMode.ALL,
 | 
			
		||||
                        5,
 | 
			
		||||
                        (Consumer<ReadTsKvQuery>) query -> assertThat(query.getLimit()).isEqualTo(5)),
 | 
			
		||||
                Arguments.of(
 | 
			
		||||
                        FetchMode.FIRST,
 | 
			
		||||
                        TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE,
 | 
			
		||||
                        (Consumer<ReadTsKvQuery>) query -> assertThat(query.getLimit()).isEqualTo(1)),
 | 
			
		||||
                Arguments.of(
 | 
			
		||||
                        FetchMode.LAST,
 | 
			
		||||
                        10,
 | 
			
		||||
                        (Consumer<ReadTsKvQuery>) query -> assertThat(query.getLimit()).isEqualTo(1))
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(FetchMode fetchMode, Direction orderBy, Consumer<ReadTsKvQuery> orderInQueryVerifier) throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setFetchMode(fetchMode);
 | 
			
		||||
        config.setOrderBy(orderBy);
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
 | 
			
		||||
        then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
 | 
			
		||||
        ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
 | 
			
		||||
        orderInQueryVerifier.accept(actualReadTsKvQuery);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of(
 | 
			
		||||
                        FetchMode.ALL,
 | 
			
		||||
                        Direction.DESC,
 | 
			
		||||
                        (Consumer<ReadTsKvQuery>) query -> assertThat(query.getOrder()).isEqualTo("DESC")),
 | 
			
		||||
                Arguments.of(
 | 
			
		||||
                        FetchMode.FIRST,
 | 
			
		||||
                        Direction.ASC,
 | 
			
		||||
                        (Consumer<ReadTsKvQuery>) query -> assertThat(query.getOrder()).isEqualTo("ASC")),
 | 
			
		||||
                Arguments.of(
 | 
			
		||||
                        FetchMode.LAST,
 | 
			
		||||
                        Direction.ASC,
 | 
			
		||||
                        (Consumer<ReadTsKvQuery>) query -> assertThat(query.getOrder()).isEqualTo("DESC"))
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    public void givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException(String startIntervalPattern, String errorMsg) throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setUseMetadataIntervalPatterns(true);
 | 
			
		||||
        config.setStartIntervalPattern(startIntervalPattern);
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msgStartInterval\":\"start\"}");
 | 
			
		||||
        assertThatThrownBy(() -> node.onMsg(ctxMock, msg)).isInstanceOf(IllegalArgumentException.class).hasMessage(errorMsg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of("${mdStartInterval}", "Message value: 'mdStartInterval' is undefined"),
 | 
			
		||||
                Arguments.of("$[msgStartInterval]", "Message value: 'msgStartInterval' has invalid format")
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenFetchModeAll_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setLatestTsKeyNames(List.of("temperature", "humidity"));
 | 
			
		||||
        config.setFetchMode(FetchMode.ALL);
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
        List<TsKvEntry> tsKvEntries = List.of(
 | 
			
		||||
                new BasicTsKvEntry(ts - 5, new DoubleDataEntry("temperature", 23.1)),
 | 
			
		||||
                new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)),
 | 
			
		||||
                new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5))
 | 
			
		||||
        );
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        then(ctxMock).should().tellSuccess(actualMsg.capture());
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("temperature", "[{\"ts\":" + (ts - 5) + ",\"value\":23.1},{\"ts\":" + (ts - 4) + ",\"value\":22.4}]");
 | 
			
		||||
        metaData.putValue("humidity", "[{\"ts\":" + (ts - 4) + ",\"value\":55.5}]");
 | 
			
		||||
        TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData);
 | 
			
		||||
        assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @ValueSource(strings = {"FIRST", "LAST"})
 | 
			
		||||
    public void givenFetchMode_whenOnMsg_thenTellSuccessAndVerifyMsg(String fetchMode) throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setFetchMode(FetchMode.valueOf(fetchMode));
 | 
			
		||||
        config.setLatestTsKeyNames(List.of("temperature", "humidity"));
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
        mockTimeseriesService();
 | 
			
		||||
        long ts = System.currentTimeMillis();
 | 
			
		||||
        List<TsKvEntry> tsKvEntries = List.of(
 | 
			
		||||
                new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)),
 | 
			
		||||
                new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5))
 | 
			
		||||
        );
 | 
			
		||||
        given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        then(ctxMock).should().tellSuccess(actualMsg.capture());
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("temperature", "\"22.4\"");
 | 
			
		||||
        metaData.putValue("humidity", "\"55.5\"");
 | 
			
		||||
        TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData);
 | 
			
		||||
        assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void mockTimeseriesService() {
 | 
			
		||||
        given(ctxMock.getTimeseriesService()).willReturn(timeseriesServiceMock);
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(TENANT_ID);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(executor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                // config for version 0 (fetchMode is 'FIRST' and orderBy is 'INVALID_ORDER_BY' and aggregation is 'SUM')
 | 
			
		||||
                Arguments.of(0,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "SUM",
 | 
			
		||||
                                                  "fetchMode": "FIRST",
 | 
			
		||||
                                                  "orderBy": "INVALID_ORDER_BY",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """,
 | 
			
		||||
                        true,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "NONE",
 | 
			
		||||
                                                  "fetchMode": "FIRST",
 | 
			
		||||
                                                  "orderBy": "ASC",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """),
 | 
			
		||||
                // config for version 0 (fetchMode is 'LAST' and orderBy is 'ASC' and aggregation is 'AVG')
 | 
			
		||||
                Arguments.of(0,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "AVG",
 | 
			
		||||
                                                  "fetchMode": "LAST",
 | 
			
		||||
                                                  "orderBy": "ASC",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """,
 | 
			
		||||
                        true,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "NONE",
 | 
			
		||||
                                                  "fetchMode": "LAST",
 | 
			
		||||
                                                  "orderBy": "DESC",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """),
 | 
			
		||||
                // config for version 0 (fetchMode is 'ALL' and orderBy is empty and aggregation is null)
 | 
			
		||||
                Arguments.of(0,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": null,
 | 
			
		||||
                                                  "fetchMode": "ALL",
 | 
			
		||||
                                                  "orderBy": "",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """,
 | 
			
		||||
                        true,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "NONE",
 | 
			
		||||
                                                  "fetchMode": "ALL",
 | 
			
		||||
                                                  "orderBy": "ASC",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """),
 | 
			
		||||
                // config for version 0 (fetchMode is 'ALL' and orderBy is 'DESC' and aggregation is 'SUM')
 | 
			
		||||
                Arguments.of(0,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "SUM",
 | 
			
		||||
                                                  "fetchMode": "ALL",
 | 
			
		||||
                                                  "orderBy": "DESC",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """,
 | 
			
		||||
                        false,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "SUM",
 | 
			
		||||
                                                  "fetchMode": "ALL",
 | 
			
		||||
                                                  "orderBy": "DESC",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """),
 | 
			
		||||
                // config for version 0 (fetchMode is 'INVALID_MODE' and orderBy is 'INVALID_ORDER_BY' and aggregation is 'INVALID_AGGREGATION')
 | 
			
		||||
                Arguments.of(0,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "INVALID_AGGREGATION",
 | 
			
		||||
                                                  "fetchMode": "INVALID_MODE",
 | 
			
		||||
                                                  "orderBy": "INVALID_ORDER_BY",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """,
 | 
			
		||||
                        true,
 | 
			
		||||
                        """
 | 
			
		||||
                                        {
 | 
			
		||||
                                                  "latestTsKeyNames": [],
 | 
			
		||||
                                                  "aggregation": "NONE",
 | 
			
		||||
                                                  "fetchMode": "LAST",
 | 
			
		||||
                                                  "orderBy": "DESC",
 | 
			
		||||
                                                  "limit": 1000,
 | 
			
		||||
                                                  "useMetadataIntervalPatterns": false,
 | 
			
		||||
                                                  "startIntervalPattern": "",
 | 
			
		||||
                                                  "endIntervalPattern": "",
 | 
			
		||||
                                                  "startInterval": 2,
 | 
			
		||||
                                                  "startIntervalTimeUnit": "MINUTES",
 | 
			
		||||
                                                  "endInterval": 1,
 | 
			
		||||
                                                  "endIntervalTimeUnit": "MINUTES"
 | 
			
		||||
                                        }
 | 
			
		||||
                                """)
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected TbNode getTestNode() {
 | 
			
		||||
        return node;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user