From eebe8d8cbf07fd187ba17e5eb3ceea153f09b549 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 16 Sep 2022 15:20:27 +0300 Subject: [PATCH] Fix invalid logic related to endTs in read queries --- .../timescale/ts/TimescaleTsKvEntity.java | 1 - ...stractChunkedAggregationTimeseriesDao.java | 31 ++++++++++--------- .../timescale/AggregationRepository.java | 2 +- .../timescale/TimescaleTimeseriesDao.java | 15 +++++++-- .../CassandraBaseTimeseriesDao.java | 6 ++-- .../timeseries/BaseTimeseriesServiceTest.java | 18 +++++------ ...ctChunkedAggregationTimeseriesDaoTest.java | 23 +++++++------- 7 files changed, 53 insertions(+), 43 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java index 9e072e25e4..ba5ad92f4f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java @@ -79,7 +79,6 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F @ColumnResult(name = "longValueCount", type = Long.class), @ColumnResult(name = "doubleValueCount", type = Long.class), @ColumnResult(name = "jsonValueCount", type = Long.class), - @ColumnResult(name = "jsonValueCount", type = Long.class), @ColumnResult(name = "maxAggTs", type = Long.class), } ) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 9acbfe85fb..22c4661fcd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -121,15 +121,14 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); } else { List>> futures = new ArrayList<>(); - long endPeriod = query.getEndTs(); long startPeriod = query.getStartTs(); + long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs()); long step = query.getInterval(); - while (startPeriod <= endPeriod) { + while (startPeriod < endPeriod) { long startTs = startPeriod; - long endTs = Math.min(startPeriod + step, endPeriod + 1); + long endTs = Math.min(startPeriod + step, endPeriod); long ts = startTs + (endTs - startTs) / 2; - ListenableFuture> aggregateTsKvEntry = - service.submit(() -> findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); + ListenableFuture> aggregateTsKvEntry = findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()); futures.add(aggregateTsKvEntry); startPeriod = endTs; } @@ -152,16 +151,18 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq return new ReadTsKvQueryResult(query.getId(), tsKvEntries, lastTs); } - Optional findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { - TsKvEntity entity = switchAggregation(entityId, key, startTs, endTs, aggregation); - if (entity != null && entity.isNotEmpty()) { - entity.setEntityId(entityId.getId()); - entity.setStrKey(key); - entity.setTs(ts); - return Optional.of(entity); - } else { - return Optional.empty(); - } + ListenableFuture> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { + return service.submit(() -> { + TsKvEntity entity = switchAggregation(entityId, key, startTs, endTs, aggregation); + if (entity != null && entity.isNotEmpty()) { + entity.setEntityId(entityId.getId()); + entity.setStrKey(key); + entity.setTs(ts); + return Optional.of(entity); + } else { + return Optional.empty(); + } + }); } protected TsKvEntity switchAggregation(EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java index db0a1753d8..d37234dfde 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java @@ -39,7 +39,7 @@ public class AggregationRepository { public static final String FROM_WHERE_CLAUSE = "FROM ts_kv tskv WHERE " + "tskv.entity_id = cast(:entityId AS uuid) " + "AND tskv.key= cast(:entityKey AS int) " + - "AND tskv.ts > :startTs AND tskv.ts <= :endTs " + + "AND tskv.ts >= :startTs AND tskv.ts < :endTs " + "GROUP BY tskv.entity_id, tskv.key, tsBucket " + "ORDER BY tskv.entity_id, tskv.key, tsBucket"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 76dabb24f6..6e9f9e61b8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -155,7 +155,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); } else { long startTs = query.getStartTs(); - long endTs = query.getEndTs(); + long endTs = Math.max(query.getStartTs() + 1, query.getEndTs()); long timeBucket = query.getInterval(); List> data = findAllAndAggregateAsync(entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation()); return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(data)); @@ -185,7 +185,18 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements } private List> findAllAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) { - List timescaleTsKvEntities = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId()); + long interval = endTs - startTs; + long remainingPart = interval % timeBucket; + List timescaleTsKvEntities; + if (remainingPart == 0) { + timescaleTsKvEntities = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId()); + } else { + interval = interval - remainingPart; + timescaleTsKvEntities = new ArrayList<>(); + timescaleTsKvEntities.addAll(switchAggregation(key, startTs, startTs + interval, timeBucket, aggregation, entityId.getId())); + timescaleTsKvEntities.addAll(switchAggregation(key, startTs + interval, endTs, remainingPart, aggregation, entityId.getId())); + } + if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) { List> result = new ArrayList<>(); timescaleTsKvEntities.forEach(entity -> { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 35eb5b8a18..367ad942a7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -268,12 +268,12 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return findAllAsyncWithLimit(tenantId, entityId, query); } else { long startPeriod = query.getStartTs(); - long endPeriod = query.getEndTs(); + long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs()); long step = Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS); List>> futures = new ArrayList<>(); - while (startPeriod <= endPeriod) { + while (startPeriod < endPeriod) { long startTs = startPeriod; - long endTs = Math.min(startPeriod + step, endPeriod + 1); + long endTs = Math.min(startPeriod + step, endPeriod); long ts = endTs - startTs; ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder()); futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 5339863044..4e2fd17846 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -213,17 +213,17 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { } saveEntries(deviceId, TS + 100L + 1L); - List queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 100, 101, 1, Aggregation.COUNT, DESC_ORDER)); + List queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 100, 100, 1, Aggregation.COUNT, DESC_ORDER)); List entries = tsService.findAll(tenantId, deviceId, queries).get(); Assert.assertEquals(1, entries.size()); - Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 11L)), entries.get(0)); + Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 10L)), entries.get(0)); EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY)); entries = tsService.findAll(tenantId, entityView.getId(), queries).get(); Assert.assertEquals(1, entries.size()); - Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 11L)), entries.get(0)); + Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 10L)), entries.get(0)); } @Test @@ -240,14 +240,14 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { List entries = tsService.findAll(tenantId, deviceId, queries).get(); Assert.assertEquals(2, entries.size()); Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0)); - Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1)); + Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 5L)), entries.get(1)); EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY)); entries = tsService.findAll(tenantId, entityView.getId(), queries).get(); Assert.assertEquals(2, entries.size()); Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0)); - Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1)); + Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 5L)), entries.get(1)); } @Test @@ -264,14 +264,14 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { List entries = tsService.findAll(tenantId, deviceId, queries).get(); Assert.assertEquals(2, entries.size()); Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0)); - Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 4L)), entries.get(1)); + Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 3L)), entries.get(1)); EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY)); entries = tsService.findAll(tenantId, entityView.getId(), queries).get(); Assert.assertEquals(2, entries.size()); Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0)); - Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 4L)), entries.get(1)); + Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 3L)), entries.get(1)); } @Test @@ -286,14 +286,14 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { List entries = tsService.findAll(tenantId, deviceId, queries).get(); Assert.assertEquals(2, entries.size()); Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0)); - Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1)); + Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 4L)), entries.get(1)); EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY)); entries = tsService.findAll(tenantId, entityView.getId(), queries).get(); Assert.assertEquals(2, entries.size()); Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0)); - Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1)); + Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 4L)), entries.get(1)); } @Test diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java index 40705809fd..9c4b1cbbea 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java @@ -19,6 +19,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; @@ -44,13 +46,13 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { final int LIMIT = 1; final String TEMP = "temp"; final String DESC = "DESC"; - AbstractChunkedAggregationTimeseriesDao tsDao; + private AbstractChunkedAggregationTimeseriesDao tsDao; @Before public void setUp() throws Exception { tsDao = spy(AbstractChunkedAggregationTimeseriesDao.class); Optional optionalListenableFuture = Optional.of(mock(TsKvEntry.class)); - willReturn(optionalListenableFuture).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); + willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any()); } @@ -58,11 +60,11 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { public void givenIntervalNotMultiplePeriod_whenAggregateCount_thenLastIntervalShorterThanOthersAndEqualsEndTs() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 2000, LIMIT, COUNT, DESC); ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 2001, 1001, LIMIT, COUNT, DESC); - ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, 3001, 2501, LIMIT, COUNT, DESC); + ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, 3000, 2501, LIMIT, COUNT, DESC); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 2001, getTsForReadTsKvQuery(1, 2001), COUNT); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 2001, 3000 + 1, getTsForReadTsKvQuery(2001, 3001), COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 2001, 3000, getTsForReadTsKvQuery(2001, 3000), COUNT); } @Test @@ -72,19 +74,17 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); assertThat(tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query)).isNotNull(); verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000 + 1, getTsForReadTsKvQuery(1, 3001), COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT); } @Test public void givenIntervalNotMultiplePeriod_whenAggregateCount_thenIntervalEqualsPeriodMinusOne() { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 2999, LIMIT, COUNT, DESC); ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3000, 1500, LIMIT, COUNT, DESC); - ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 3000, 3001, 3000, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 3000, 3001, getTsForReadTsKvQuery(3000, 3001), COUNT); } @@ -95,7 +95,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT); } @Test @@ -135,7 +135,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT); } @Test @@ -145,8 +145,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); for (long i = 1; i <= 3000; i += 3) { - ReadTsKvQuery querySub = new BaseReadTsKvQuery(TEMP, i, i + 3, i + (i + 3 - i) / 2, LIMIT, COUNT, DESC); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, querySub.getKey(), i, i + 3, getTsForReadTsKvQuery(i, i + 3), COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, TEMP, i, Math.min(i + 3, 3000), getTsForReadTsKvQuery(i, i + 3), COUNT); } }