diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java index bf6948b1ac..a99ad053b6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java @@ -38,23 +38,24 @@ public interface AggregationTimeseriesDao { if (query.getAggregation() == Aggregation.NONE) { return findAllAsyncWithLimit(tenantId, entityId, query); } else { - long step = getIntervalGreaterOrEqualsMinAggregationStep(query.getInterval()); - long stepTs = query.getStartTs(); - List>> futures = findIntervals(tenantId, entityId, query, step, stepTs); + List>> futures = findIntervals(tenantId, entityId, query); return getTskvEntriesFuture(Futures.allAsList(futures)); } } - default List>> findIntervals(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, long step, long stepTs) { + default List>> findIntervals(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { List>> futures = new ArrayList<>(); - while (stepTs < query.getEndTs()) { - long startTs = stepTs; - long endTs = stepTs + step; + long endPeriod = query.getEndTs(); + long startPeriod = query.getStartTs(); + long step = query.getInterval(); + while (startPeriod <= endPeriod) { + long startTs = startPeriod; + long endTs = Math.min(startPeriod + step, endPeriod + 1); long ts = getTsForReadTsKvQuery(startTs, endTs); ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder()); ListenableFuture> aggregateTsKvEntry = findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs), query.getAggregation()); futures.add(aggregateTsKvEntry); - stepTs = endTs; + startPeriod = endTs; } return futures; } diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java index 43c2467885..b8a2328fe2 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java @@ -39,91 +39,109 @@ import static org.mockito.Mockito.verify; public class AbstractChunkedAggregationTimeseriesDaoTest { final int START_TS = 1; - final int LIMIT = 0; + final int LIMIT = 1; final int END_TS = 3000; final String TEMP = "temp"; final String DESC = "DESC"; AbstractChunkedAggregationTimeseriesDao tsDao; + //SOME PRESENT: When we give data with period l-r, program return data in interval [l;r) + @Before public void setUp() throws Exception { tsDao = mock(AbstractChunkedAggregationTimeseriesDao.class); ListenableFuture> optionalListenableFuture = Futures.immediateFuture(Optional.of(mock(TsKvEntry.class))); willReturn(optionalListenableFuture).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); + willReturn(Futures.immediateFuture(null)).given(tsDao).getTskvEntriesFuture(any()); + willCallRealMethod().given(tsDao).findAllAsync(any(), any(), any(ReadTsKvQuery.class)); + willCallRealMethod().given(tsDao).findIntervals(any(), any(), any(ReadTsKvQuery.class)); + willCallRealMethod().given(tsDao).getTsForReadTsKvQuery(anyLong(), anyLong()); + willCallRealMethod().given(tsDao).toPartitionTs(anyLong()); + willCallRealMethod().given(tsDao).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanLastIntervalShorterThanOthersAndEqualsEndTs() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, 2000, LIMIT, Aggregation.COUNT, DESC); - willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); + ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, 2001, START_TS + (2001 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, END_TS, 2001 + (END_TS + 1 - 2001) / 2, LIMIT, Aggregation.COUNT, DESC); + tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(2)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 1, 2000, 1000, Aggregation.COUNT); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 2001, END_TS, 2500, Aggregation.COUNT); + verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, 2001, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQuerySecond, 2001, END_TS + 1, Aggregation.COUNT); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsPeriod() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, END_TS, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, START_TS + (END_TS + 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 1, END_TS, 1500, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS + 1, Aggregation.COUNT); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsPeriodMinusOne() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, 2999, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS - 2, START_TS + (END_TS - 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, END_TS - 1, END_TS, END_TS - 1 + (END_TS + 1 - (END_TS - 1)) / 2, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(2)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 0, 2999, 1499, Aggregation.COUNT); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, END_TS, END_TS, END_TS, Aggregation.COUNT); + verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQuerySecond, END_TS, END_TS + 1, Aggregation.COUNT); + } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsPeriodPlusOne() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, 3001, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, START_TS + (3001 + 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, START_TS, END_TS, 1501, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS + 1, Aggregation.COUNT); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsOneMillisecondAndStartTsIsZero() { - ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, 0, 1, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 0, 0, 1, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 0, 0, 0, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 0, 0, 0, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, 0, 0 + 1, Aggregation.COUNT); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsOneMillisecondAndStartTsIsOne() { - ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, 1, 1, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, START_TS, 1, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 1, 1, 1, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query, START_TS, START_TS + 1, Aggregation.COUNT); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsOneMillisecondAndStartTsIsIntegerMax() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE, 1, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, Integer.MAX_VALUE + (Integer.MAX_VALUE + 1L - Integer.MAX_VALUE) / 2L, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, Aggregation.COUNT); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsBigNumber() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, Integer.MAX_VALUE, LIMIT, Aggregation.COUNT, DESC); + ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, START_TS + (END_TS + 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, START_TS, END_TS, 1500, Aggregation.COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS + 1, Aggregation.COUNT); } @Test @@ -132,9 +150,10 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, intervalTs, LIMIT, Aggregation.COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query); - verify(tsDao, times(1000)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); - for (long i = START_TS; i < END_TS; i += intervalTs) { - verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, i, i + intervalTs, i, Aggregation.COUNT); + verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + for (long i = START_TS; i <= END_TS; i += intervalTs) { + ReadTsKvQuery querySub = new BaseReadTsKvQuery(TEMP, i, i + intervalTs, i + (i + intervalTs - i) / 2, LIMIT, Aggregation.COUNT, DESC); + verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, querySub, i, i + intervalTs, Aggregation.COUNT); } } } \ No newline at end of file