splitted to more specific tests

This commit is contained in:
IrynaMatveieva 2024-07-04 15:05:46 +03:00
parent 9fe262fa4c
commit b4ba613c54
2 changed files with 304 additions and 195 deletions

View File

@ -66,19 +66,19 @@ public class TbGetTelemetryNode implements TbNode {
private List<String> tsKeyNames; private List<String> tsKeyNames;
private int limit; private int limit;
private String fetchMode; private String fetchMode;
private String orderByFetchAll; private String orderBy;
private Aggregation aggregation; private Aggregation aggregation;
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetTelemetryNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbGetTelemetryNodeConfiguration.class);
tsKeyNames = config.getLatestTsKeyNames(); tsKeyNames = config.getLatestTsKeyNames();
if (tsKeyNames.isEmpty()) {
throw new TbNodeException("Telemetry is not selected!", true);
}
limit = config.getFetchMode().equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL) ? validateLimit(config.getLimit()) : 1; limit = config.getFetchMode().equals(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL) ? validateLimit(config.getLimit()) : 1;
fetchMode = config.getFetchMode(); fetchMode = config.getFetchMode();
orderByFetchAll = config.getOrderBy(); orderBy = getOrderBy();
if (StringUtils.isEmpty(orderByFetchAll)) {
orderByFetchAll = ASC_ORDER;
}
aggregation = parseAggregationConfig(config.getAggregation()); aggregation = parseAggregationConfig(config.getAggregation());
} }
@ -91,21 +91,13 @@ public class TbGetTelemetryNode implements TbNode {
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
if (tsKeyNames.isEmpty()) { Interval interval = getInterval(msg);
ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!")); List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg);
} else { ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys));
try { DonAsynchron.withCallback(list, data -> {
Interval interval = getInterval(msg); var metaData = updateMetadata(data, msg, keys);
List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg); ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData));
ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys)); }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
DonAsynchron.withCallback(list, data -> {
var metaData = updateMetadata(data, msg, keys);
ctx.tellSuccess(TbMsg.transformMsgMetadata(msg, metaData));
}, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
} catch (Exception e) {
ctx.tellFailure(msg, e);
}
}
} }
private List<ReadTsKvQuery> buildQueries(Interval interval, List<String> keys) { private List<ReadTsKvQuery> buildQueries(Interval interval, List<String> keys) {
@ -115,14 +107,14 @@ public class TbGetTelemetryNode implements TbNode {
interval.getEndTs() - interval.getStartTs(); interval.getEndTs() - interval.getStartTs();
return keys.stream() return keys.stream()
.map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, getOrderBy())) .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, orderBy))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private String getOrderBy() { private String getOrderBy() throws TbNodeException {
switch (fetchMode) { switch (fetchMode) {
case TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL: case TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL:
return orderByFetchAll; return getOrderByFetchAll();
case TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST: case TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST:
return ASC_ORDER; return ASC_ORDER;
default: default:
@ -130,6 +122,17 @@ public class TbGetTelemetryNode implements TbNode {
} }
} }
private String getOrderByFetchAll() throws TbNodeException {
String orderBy = config.getOrderBy();
if (ASC_ORDER.equals(orderBy) || DESC_ORDER.equals(orderBy)) {
return orderBy;
}
if (StringUtils.isBlank(orderBy)) {
return ASC_ORDER;
}
throw new TbNodeException("Invalid fetch order selected.", true);
}
private TbMsgMetaData updateMetadata(List<TsKvEntry> entries, TbMsg msg, List<String> keys) { private TbMsgMetaData updateMetadata(List<TsKvEntry> entries, TbMsg msg, List<String> keys) {
ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER);
if (TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL.equals(fetchMode)) { if (TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL.equals(fetchMode)) {
@ -210,12 +213,11 @@ public class TbGetTelemetryNode implements TbNode {
return pattern.replaceAll("[$\\[{}\\]]", ""); return pattern.replaceAll("[$\\[{}\\]]", "");
} }
private int validateLimit(int limit) { private int validateLimit(int limit) throws TbNodeException {
if (limit != 0) { if (limit < 2 || limit > TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE) {
return limit; throw new TbNodeException("Limit should be in a range from 2 to 1000.", true);
} else {
return TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE;
} }
return limit;
} }
@Data @Data

View File

@ -22,34 +22,36 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -77,14 +79,15 @@ public class TbGetTelemetryNodeTest {
private TimeseriesService timeseriesServiceMock; private TimeseriesService timeseriesServiceMock;
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() {
node = new TbGetTelemetryNode(); node = new TbGetTelemetryNode();
config = new TbGetTelemetryNodeConfiguration().defaultConfiguration(); config = new TbGetTelemetryNodeConfiguration().defaultConfiguration();
config.setLatestTsKeyNames(List.of("temperature"));
} }
@Test @Test
public void givenAggregationAsString_whenParseAggregation_thenReturnEnum() throws TbNodeException { public void givenAggregationAsString_whenParseAggregation_thenReturnEnum() throws TbNodeException {
config.setFetchMode("ALL"); config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
//compatibility with old configs without "aggregation" parameter //compatibility with old configs without "aggregation" parameter
@ -107,20 +110,27 @@ public class TbGetTelemetryNodeTest {
@Test @Test
public void givenAggregationWhiteSpace_whenParseAggregation_thenException() throws TbNodeException { public void givenAggregationWhiteSpace_whenParseAggregation_thenException() throws TbNodeException {
config.setFetchMode("ALL"); // GIVEN
config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
// WHEN-THEN
assertThatThrownBy(() -> node.parseAggregationConfig(" ")).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> node.parseAggregationConfig(" ")).isInstanceOf(IllegalArgumentException.class);
} }
@Test @Test
public void givenAggregationIncorrect_whenParseAggregation_thenException() throws TbNodeException { public void givenAggregationIncorrect_whenParseAggregation_thenException() throws TbNodeException {
config.setFetchMode("ALL"); // GIVEN
config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
// WHEN-THEN
assertThatThrownBy(() -> node.parseAggregationConfig("TOP")).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> node.parseAggregationConfig("TOP")).isInstanceOf(IllegalArgumentException.class);
} }
@Test @Test
public void verifyDefaultConfig() { public void verifyDefaultConfig() {
config = new TbGetTelemetryNodeConfiguration().defaultConfiguration();
assertThat(config.getStartInterval()).isEqualTo(2); assertThat(config.getStartInterval()).isEqualTo(2);
assertThat(config.getEndInterval()).isEqualTo(1); assertThat(config.getEndInterval()).isEqualTo(1);
assertThat(config.getStartIntervalPattern()).isEqualTo(""); assertThat(config.getStartIntervalPattern()).isEqualTo("");
@ -128,228 +138,325 @@ public class TbGetTelemetryNodeTest {
assertThat(config.isUseMetadataIntervalPatterns()).isFalse(); assertThat(config.isUseMetadataIntervalPatterns()).isFalse();
assertThat(config.getStartIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name()); assertThat(config.getStartIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name());
assertThat(config.getEndIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name()); assertThat(config.getEndIntervalTimeUnit()).isEqualTo(TimeUnit.MINUTES.name());
assertThat(config.getFetchMode()).isEqualTo("FIRST"); assertThat(config.getFetchMode()).isEqualTo(TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST);
assertThat(config.getOrderBy()).isEqualTo("ASC"); assertThat(config.getOrderBy()).isEqualTo("ASC");
assertThat(config.getAggregation()).isEqualTo(Aggregation.NONE.name()); assertThat(config.getAggregation()).isEqualTo(Aggregation.NONE.name());
assertThat(config.getLimit()).isEqualTo(1000); assertThat(config.getLimit()).isEqualTo(1000);
assertThat(config.getLatestTsKeyNames()).isEmpty(); assertThat(config.getLatestTsKeyNames()).isEmpty();
} }
@Test
public void givenEmptyTsKeyNames_whenInit_thenThrowsException() {
// GIVEN
config.setLatestTsKeyNames(Collections.emptyList());
// WHEN-THEN
assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
.isInstanceOf(TbNodeException.class)
.hasMessage("Telemetry is not selected!")
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(true);
}
@ParameterizedTest @ParameterizedTest
@MethodSource @ValueSource(ints = {-1, 0, 1, 1001, 2000})
public void givenFetchModeAndLimit_whenInit_thenVerifyLimit(String fetchMode, int limit, int expectedLimit) throws TbNodeException { public void givenFetchModeAllAndLimitIsOutOfRange_whenInit_thenThrowsException(int limit) {
config.setFetchMode(fetchMode); // GIVEN
config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL);
config.setLimit(limit); config.setLimit(limit);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); // THEN
assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
var actualLimit = ReflectionTestUtils.getField(node, "limit"); .isInstanceOf(TbNodeException.class)
assertThat(actualLimit).isEqualTo(expectedLimit); .hasMessage("Limit should be in a range from 2 to 1000.")
} .extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(true);
private static Stream<Arguments> givenFetchModeAndLimit_whenInit_thenVerifyLimit() {
return Stream.of(
Arguments.of("FIRST", 0, 1),
Arguments.of("LAST", 10, 1),
Arguments.of("ALL", 0, 1000),
Arguments.of("ALL", 5, 5)
);
} }
@ParameterizedTest @ParameterizedTest
@NullAndEmptySource @ValueSource(strings = {".ASC", "ascending", "DESCENDING"})
public void givenEmptyOrderBy_whenInit_thenVerify(String orderBy) throws TbNodeException { public void givenFetchModeAllAndInvalidOrderBy_whenInit_thenThrowsException(String orderBy) {
// GIVEN
config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL);
config.setLimit(2);
config.setOrderBy(orderBy); config.setOrderBy(orderBy);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); // WHEN-THEN
assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
var actualOrderBy = ReflectionTestUtils.getField(node, "orderByFetchAll"); .isInstanceOf(TbNodeException.class)
assertThat(actualOrderBy).isEqualTo("ASC"); .hasMessage("Invalid fetch order selected.")
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(true);
} }
@Test @Test
public void givenConfig_whenInit_thenVerify() throws TbNodeException { public void givenUseMetadataIntervalPatternsIsTrue_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException {
List<String> keys = List.of("temperature", "humidity"); // GIVEN
config.setLatestTsKeyNames(keys); config.setUseMetadataIntervalPatterns(true);
config.setFetchMode("ALL"); config.setStartIntervalPattern("${mdStartInterval}");
config.setLimit(5); config.setEndIntervalPattern("$[msgEndInterval]");
config.setOrderBy("DESC");
config.setAggregation("MIN");
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
var actualLimit = ReflectionTestUtils.getField(node, "limit"); mockTimeseriesService();
var actualTsKeyNames = ReflectionTestUtils.getField(node, "tsKeyNames"); given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
var actualFetchMode = ReflectionTestUtils.getField(node, "fetchMode");
var actualOrderByFetchAll = ReflectionTestUtils.getField(node, "orderByFetchAll");
var actualAggregation = ReflectionTestUtils.getField(node, "aggregation");
assertThat(actualLimit).isEqualTo(5); // WHEN
assertThat(actualTsKeyNames).isEqualTo(keys); long startTs = 1719220350000L;
assertThat(actualFetchMode).isEqualTo("ALL"); long endTs = 1719220353000L;
assertThat(actualOrderByFetchAll).isEqualTo("DESC"); TbMsgMetaData metaData = new TbMsgMetaData();
assertThat(actualAggregation).isEqualTo(Aggregation.MIN); metaData.putValue("mdStartInterval", String.valueOf(startTs));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}");
node.onMsg(ctxMock, msg);
/// THEN
ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
assertThat(actualReadTsKvQuery.getStartTs()).isEqualTo(startTs);
assertThat(actualReadTsKvQuery.getEndTs()).isEqualTo(endTs);
} }
@Test @Test
public void givenEmptyTsKeyNames_whenOnMsg_thenTellFailure() throws TbNodeException { public void givenUseMetadataIntervalPatternsIsFalse_whenOnMsg_thenVerifyStartAndEndTsInQuery() throws TbNodeException {
// GIVEN
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
// WHEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg); node.onMsg(ctxMock, msg);
ArgumentCaptor<Throwable> actualException = ArgumentCaptor.forClass(Throwable.class); // THEN
then(ctxMock).should().tellFailure(eq(msg), actualException.capture()); ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
assertThat(actualException.getValue()) then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
.isInstanceOf(IllegalStateException.class) ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
.hasMessage("Telemetry is not selected!"); assertThat(actualReadTsKvQuery.getStartTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getStartInterval()));
assertThat(actualReadTsKvQuery.getEndTs()).isLessThan(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(config.getEndInterval()));
} }
@Test @Test
public void givenUseMetadataIntervalPatternsIsTrueAndFetchModeIsAllAndAggregationIsMin_whenOnMsg_thenTellSuccess() throws TbNodeException { public void givenTsKeyNamesPatterns_whenOnMsg_thenVerifyTsKeyNamesInQuery() throws TbNodeException {
config.setLatestTsKeyNames(List.of("temperature", "humidity")); // GIVEN
config.setUseMetadataIntervalPatterns(true); config.setLatestTsKeyNames(List.of("temperature", "${mdTsKey}", "$[msgTsKey]"));
config.setStartIntervalPattern("${mdStartInterval}");
config.setEndIntervalPattern("$[msgEndInterval]");
config.setFetchMode("ALL");
config.setAggregation("MIN");
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService(); mockTimeseriesService();
List<TsKvEntry> tsKvEntries = List.of( given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
new BasicTsKvEntry(System.currentTimeMillis() - 5, new DoubleDataEntry("temperature", 22.4)),
new BasicTsKvEntry(System.currentTimeMillis() - 4, new DoubleDataEntry("humidity", 55.5))
);
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries));
long startTs = 1719220350000L;
long endTs = 1719220353000L;
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("mdStartInterval", String.valueOf(startTs));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}");
node.onMsg(ctxMock, msg);
List<ReadTsKvQuery> expectedReadTsKvQueryList = List.of(
new BaseReadTsKvQuery("temperature", startTs, endTs, endTs - startTs, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, Aggregation.MIN, "ASC"),
new BaseReadTsKvQuery("humidity", startTs, endTs, endTs - startTs, TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE, Aggregation.MIN, "ASC")
);
verifyReadTsKvQueryList(expectedReadTsKvQueryList, false);
verifyOutgoingMsg(msg);
}
@Test
public void givenUseMetadataIntervalPatternsIsTrueAndFetchModeIsLastAndAggregationIsMax_whenOnMsg_thenTellSuccess() throws TbNodeException {
config.setLatestTsKeyNames(List.of("temperature", "humidity"));
config.setUseMetadataIntervalPatterns(true);
config.setStartIntervalPattern("${mdStartInterval}");
config.setEndIntervalPattern("$[msgEndInterval]");
config.setFetchMode("LAST");
config.setAggregation("MAX");
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
long ts = System.currentTimeMillis();
List<TsKvEntry> tsKvEntries = List.of(
new BasicTsKvEntry(ts - 5, new DoubleDataEntry("temperature", 22.4)),
new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5))
);
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries));
long startTs = 1719220350000L;
long endTs = 1719220353000L;
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("mdStartInterval", String.valueOf(startTs));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgEndInterval\":\"" + endTs + "\"}");
node.onMsg(ctxMock, msg);
List<ReadTsKvQuery> expectedReadTsKvQueryList = List.of(
new BaseReadTsKvQuery("temperature", startTs, endTs, 1, 1, Aggregation.NONE, "DESC"),
new BaseReadTsKvQuery("humidity", startTs, endTs, 1, 1, Aggregation.NONE, "DESC")
);
verifyReadTsKvQueryList(expectedReadTsKvQueryList, false);
verifyOutgoingMsg(msg);
}
@Test
public void givenUseMetadataIntervalPatternsIsFalseAndTsKeyNamesPatternsAndFetchModeIsFirst_whenOnMsg_thenTellFailure() throws TbNodeException {
config.setLatestTsKeyNames(List.of("temperature", "${mdTsKey}", "$[msgTsKey]"));;
config.setFetchMode("FIRST");
config.setStartInterval(6);
config.setEndInterval(1);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
String errorMsg = "Something went wrong";
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFailedFuture(new RuntimeException(errorMsg)));
// WHEN
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("mdTsKey", "humidity"); metaData.putValue("mdTsKey", "humidity");
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgTsKey\":\"pressure\"}"); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, "{\"msgTsKey\":\"pressure\"}");
node.onMsg(ctxMock, msg); node.onMsg(ctxMock, msg);
long ts = System.currentTimeMillis(); // THEN
long startTs = ts - TimeUnit.SECONDS.toMillis(config.getStartInterval()); ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
long endTs = ts - TimeUnit.SECONDS.toMillis(config.getEndInterval()); then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
List<ReadTsKvQuery> expecetdReadTsKvQueryList = List.of( List<String> actualKeys = actualReadTsKvQueryList.getValue().stream().map(TsKvQuery::getKey).toList();
new BaseReadTsKvQuery("temperature", startTs, endTs, 1, 1, Aggregation.NONE, "ASC"), List<String> expectedTsKeyNames = config.getLatestTsKeyNames().stream().map(tsKeyName -> TbNodeUtils.processPattern(tsKeyName, msg)).toList();
new BaseReadTsKvQuery("humidity", startTs, endTs, 1, 1, Aggregation.NONE, "ASC"), assertThat(actualKeys).containsAll(expectedTsKeyNames);
new BaseReadTsKvQuery("pressure", startTs, endTs, 1, 1, Aggregation.NONE, "ASC")
);
verifyReadTsKvQueryList(expecetdReadTsKvQueryList, true);
ArgumentCaptor<Throwable> actualException = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(eq(msg), actualException.capture());
assertThat(actualException.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
} }
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
public void givenInvalidIntervalPatterns_whenOnMsg_thenTellFailure(String startIntervalPattern, String errorMsg) throws TbNodeException { public void givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery(String aggregation, Consumer<ReadTsKvQuery> verifyAggregationStepInQuery) throws TbNodeException {
config.setLatestTsKeyNames(List.of("${mdKey}")); // GIVEN
config.setStartInterval(5);
config.setEndInterval(1);
config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL);
config.setAggregation(aggregation);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
// WHEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
// THEN
ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
verifyAggregationStepInQuery.accept(actualReadTsKvQuery);
}
private static Stream<Arguments> givenAggregation_whenOnMsg_thenVerifyAggregationStepInQuery() {
return Stream.of(
Arguments.of("", (Consumer<ReadTsKvQuery>) query -> assertThat(query.getInterval()).isEqualTo(1)),
Arguments.of("MIN", (Consumer<ReadTsKvQuery>) query -> assertThat(query.getInterval()).isEqualTo(query.getEndTs() - query.getStartTs()))
);
}
@ParameterizedTest
@MethodSource
public void givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery(String fetchMode, int limit, Consumer<ReadTsKvQuery> verifyLimitInQuery) throws TbNodeException {
// GIVEN
config.setFetchMode(fetchMode);
config.setLimit(limit);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
// WHEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
// THEN
ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
verifyLimitInQuery.accept(actualReadTsKvQuery);
}
private static Stream<Arguments> givenFetchModeAndLimit_whenOnMsg_thenVerifyLimitInQuery() {
return Stream.of(
Arguments.of(
TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL,
5,
(Consumer<ReadTsKvQuery>) query -> assertThat(query.getLimit()).isEqualTo(5)),
Arguments.of(
TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST,
TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE,
(Consumer<ReadTsKvQuery>) query -> assertThat(query.getLimit()).isEqualTo(1))
);
}
@ParameterizedTest
@MethodSource
public void givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery(String fetchMode, String orderBy, Consumer<ReadTsKvQuery> verifyOrderInQuery) throws TbNodeException {
// GIVEN
config.setFetchMode(fetchMode);
config.setOrderBy(orderBy);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
// WHEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
// THEN
ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryList = ArgumentCaptor.forClass(List.class);
then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryList.capture());
ReadTsKvQuery actualReadTsKvQuery = actualReadTsKvQueryList.getValue().get(0);
verifyOrderInQuery.accept(actualReadTsKvQuery);
}
private static Stream<Arguments> givenFetchModeAndOrder_whenOnMsg_thenVerifyOrderInQuery() {
return Stream.of(
Arguments.of(
TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL,
"",
(Consumer<ReadTsKvQuery>) query -> assertThat(query.getOrder()).isEqualTo("ASC")),
Arguments.of(
TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL,
"DESC",
(Consumer<ReadTsKvQuery>) query -> assertThat(query.getOrder()).isEqualTo("DESC")),
Arguments.of(
TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST,
"ASC",
(Consumer<ReadTsKvQuery>) query -> assertThat(query.getOrder()).isEqualTo("ASC")),
Arguments.of(
TbGetTelemetryNodeConfiguration.FETCH_MODE_LAST,
"ASC",
(Consumer<ReadTsKvQuery>) query -> assertThat(query.getOrder()).isEqualTo("DESC"))
);
}
@ParameterizedTest
@MethodSource
public void givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException(String startIntervalPattern, String errorMsg) throws TbNodeException {
// GIVEN
config.setUseMetadataIntervalPatterns(true); config.setUseMetadataIntervalPatterns(true);
config.setStartIntervalPattern(startIntervalPattern); config.setStartIntervalPattern(startIntervalPattern);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
// WHEN-THEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msgStartInterval\":\"start\"}"); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msgStartInterval\":\"start\"}");
node.onMsg(ctxMock, msg); assertThatThrownBy(() -> node.onMsg(ctxMock, msg)).isInstanceOf(IllegalArgumentException.class).hasMessage(errorMsg);
ArgumentCaptor<Throwable> actualException = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(eq(msg), actualException.capture());
assertThat(actualException.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage(errorMsg);
} }
private static Stream<Arguments> givenInvalidIntervalPatterns_whenOnMsg_thenTellFailure() { private static Stream<Arguments> givenInvalidIntervalPatterns_whenOnMsg_thenThrowsException() {
return Stream.of( return Stream.of(
Arguments.of("${mdStartInterval}", "Message value: 'mdStartInterval' is undefined"), Arguments.of("${mdStartInterval}", "Message value: 'mdStartInterval' is undefined"),
Arguments.of("$[msgStartInterval]", "Message value: 'msgStartInterval' has invalid format") Arguments.of("$[msgStartInterval]", "Message value: 'msgStartInterval' has invalid format")
); );
} }
@Test
public void givenFetchModeAll_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException {
// GIVEN
config.setLatestTsKeyNames(List.of("temperature", "humidity"));
config.setFetchMode(TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
long ts = System.currentTimeMillis();
List<TsKvEntry> tsKvEntries = List.of(
new BasicTsKvEntry(ts - 5, new DoubleDataEntry("temperature", 23.1)),
new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)),
new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5))
);
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries));
// WHEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
// THEN
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().tellSuccess(actualMsg.capture());
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temperature", "[{\"ts\":" + (ts - 5) + ",\"value\":23.1},{\"ts\":" + (ts - 4) + ",\"value\":22.4}]");
metaData.putValue("humidity", "[{\"ts\":" + (ts - 4) + ",\"value\":55.5}]");
TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData);
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
}
@Test
public void givenFetchModeIsFirst_whenOnMsg_thenTellSuccessAndVerifyMsg() throws TbNodeException {
// GIVEN
config.setLatestTsKeyNames(List.of("temperature", "humidity"));
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
mockTimeseriesService();
long ts = System.currentTimeMillis();
List<TsKvEntry> tsKvEntries = List.of(
new BasicTsKvEntry(ts - 4, new DoubleDataEntry("temperature", 22.4)),
new BasicTsKvEntry(ts - 4, new DoubleDataEntry("humidity", 55.5))
);
given(timeseriesServiceMock.findAll(any(TenantId.class), any(EntityId.class), anyList())).willReturn(Futures.immediateFuture(tsKvEntries));
// WHEN
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
// THEN
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().tellSuccess(actualMsg.capture());
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temperature", "\"22.4\"");
metaData.putValue("humidity", "\"55.5\"");
TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData);
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
}
private void mockTimeseriesService() { private void mockTimeseriesService() {
given(ctxMock.getTimeseriesService()).willReturn(timeseriesServiceMock); given(ctxMock.getTimeseriesService()).willReturn(timeseriesServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID); given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getDbCallbackExecutor()).willReturn(executor); given(ctxMock.getDbCallbackExecutor()).willReturn(executor);
} }
private void verifyReadTsKvQueryList(List<ReadTsKvQuery> expectedReadTsKvQueryList, boolean ignoreTs) {
ArgumentCaptor<List<ReadTsKvQuery>> actualReadTsKvQueryCaptor = ArgumentCaptor.forClass(List.class);
then(timeseriesServiceMock).should().findAll(eq(TENANT_ID), eq(DEVICE_ID), actualReadTsKvQueryCaptor.capture());
List<ReadTsKvQuery> actualReadTsKvQuery = actualReadTsKvQueryCaptor.getValue();
for (int i = 0; i < expectedReadTsKvQueryList.size(); i++) {
assertThat(actualReadTsKvQuery.get(i))
.usingRecursiveComparison()
.ignoringFields(!ignoreTs ? "id" : "endTs", "startTs", "id")
.isEqualTo(expectedReadTsKvQueryList.get(i));
}
}
private void verifyOutgoingMsg(TbMsg expectedMsg) {
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().tellSuccess(actualMsgCaptor.capture());
TbMsg actualMsg = actualMsgCaptor.getValue();
assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx", "metaData").isEqualTo(expectedMsg);
assertThat(actualMsg.getMetaData().getData()).containsKeys("temperature", "humidity");
}
} }