From c7cfd92a7adf2ded8d01afa8e02f6bd772dc81fa Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 19 Mar 2025 21:33:39 +0100 Subject: [PATCH 1/5] fixed OOM if startTs and endTs in agg command are the same --- .../dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 4f7239cfb3..37fa159cf9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -119,7 +119,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { var aggParams = query.getAggParameters(); - if (Aggregation.NONE.equals(aggParams.getAggregation())) { + if (Aggregation.NONE.equals(aggParams.getAggregation()) || aggParams.getInterval() == 0) { return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); } else { List>> futures = new ArrayList<>(); From 3f936ef6fc4d9fedc77e37453a82b94383ce9806 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 19 Mar 2025 23:07:42 +0100 Subject: [PATCH 2/5] added fix for negative interval --- .../dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 37fa159cf9..e7351bbddd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -119,7 +119,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { var aggParams = query.getAggParameters(); - if (Aggregation.NONE.equals(aggParams.getAggregation()) || aggParams.getInterval() == 0) { + if (Aggregation.NONE.equals(aggParams.getAggregation()) || aggParams.getInterval() < 1) { return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); } else { List>> futures = new ArrayList<>(); From f5eabdca3c1ce13855b80987c7525e445ddcf7b4 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 20 Mar 2025 11:37:13 +0100 Subject: [PATCH 3/5] added tests --- ...stractChunkedAggregationTimeseriesDao.java | 2 +- ...ctChunkedAggregationTimeseriesDaoTest.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index e7351bbddd..1c5d554350 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -144,7 +144,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq } } - private ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { + ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { Integer keyId = keyDictionaryDao.getOrSaveKeyId(query.getKey()); List tsKvEntities = tsKvRepository.findAllWithLimit( entityId.getId(), diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java index f9ea7d2a41..b1967ce218 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java @@ -51,6 +51,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { Optional optionalListenableFuture = Optional.of(mock(TsKvEntry.class)); willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any()); + willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllAsyncWithLimit(any(), any()); } @Test @@ -146,6 +147,24 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { } } + @Test + public void givenZeroInterval_whenAggregateCount_thenFindAllWithoutAggregation() { + ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 0, LIMIT, COUNT, DESC); + willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); + tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); + verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any()); + verify(tsDao, times(0)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + } + + @Test + public void givenNegativeInterval_whenAggregateCount_thenFindAllWithoutAggregation() { + ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 0, LIMIT, COUNT, DESC); + willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); + tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); + verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any()); + verify(tsDao, times(0)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + } + long getTsForReadTsKvQuery(long startTs, long endTs) { return startTs + (endTs - startTs) / 2L; } From c3b608492a160bf5031f66d3aa1a41c9c7d9b99b Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 20 Mar 2025 11:44:43 +0100 Subject: [PATCH 4/5] test improvements --- .../AbstractChunkedAggregationTimeseriesDaoTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java index b1967ce218..eb34e6dc4c 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java @@ -149,16 +149,16 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { @Test public void givenZeroInterval_whenAggregateCount_thenFindAllWithoutAggregation() { - ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 0, LIMIT, COUNT, DESC); - willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any()); - verify(tsDao, times(0)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + givenInterval_whenAggregateCount_thenFindAllWithoutAggregation(0); } @Test public void givenNegativeInterval_whenAggregateCount_thenFindAllWithoutAggregation() { - ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 0, LIMIT, COUNT, DESC); + givenInterval_whenAggregateCount_thenFindAllWithoutAggregation(-1); + } + + public void givenInterval_whenAggregateCount_thenFindAllWithoutAggregation(int interval) { + ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, interval, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any()); From 77c9628b14aef5e53bfc43c07085ee237fa2e13a Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 20 Mar 2025 15:48:44 +0100 Subject: [PATCH 5/5] added interval validation to the query validator --- .../dao/timeseries/BaseTimeseriesService.java | 7 ++++++- .../timeseries/BaseTimeseriesServiceTest.java | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index a1137853ab..9eefcaae1e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -295,7 +295,12 @@ public class BaseTimeseriesService implements TimeseriesService { throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty"); } if (!Aggregation.NONE.equals(query.getAggregation())) { - long step = Math.max(query.getInterval(), 1000); + long interval = query.getInterval(); + if (interval < 1) { + throw new IncorrectParameterException("Invalid TsKvQuery: 'interval' must be greater than 0, but got " + interval + + ". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'."); + } + long step = Math.max(interval, 1000); long intervalCounts = (query.getEndTs() - query.getStartTs()) / step; if (intervalCounts > maxTsIntervals || intervalCounts < 0) { throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " + diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index a070de4d82..6a38159804 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.objects.TelemetryEntityView; import org.thingsboard.server.dao.entityview.EntityViewService; +import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.service.AbstractServiceTest; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -757,6 +758,21 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertThat(fullList).containsOnlyOnceElementsOf(timeseries); } + @Test + public void testFindAllByQueriesWithAggregationAndZeroInterval() throws Exception { + testFindAllByQueriesWithAggregationAndInvalidInterval(0); + } + + @Test + public void testFindAllByQueriesWithAggregationAndNegativeInterval() throws Exception { + testFindAllByQueriesWithAggregationAndInvalidInterval(-1); + } + + private void testFindAllByQueriesWithAggregationAndInvalidInterval(long interval) { + BaseReadTsKvQuery query = new BaseReadTsKvQuery(STRING_KEY, TS, TS, interval, 1000, Aggregation.SUM, "DESC"); + Assert.assertThrows(IncorrectParameterException.class, () -> findAndVerifyQueryId(deviceId, query)); + } + private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception { TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value)); tsService.save(tenantId, deviceId, entry).get(MAX_TIMEOUT, TimeUnit.SECONDS);