Fix invalid logic related to endTs in read queries

This commit is contained in:
Andrii Shvaika 2022-09-16 15:20:27 +03:00
parent 84da448dc0
commit eebe8d8cbf
7 changed files with 53 additions and 43 deletions

View File

@ -79,7 +79,6 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F
@ColumnResult(name = "longValueCount", type = Long.class),
@ColumnResult(name = "doubleValueCount", type = Long.class),
@ColumnResult(name = "jsonValueCount", type = Long.class),
@ColumnResult(name = "jsonValueCount", type = Long.class),
@ColumnResult(name = "maxAggTs", type = Long.class),
}
)

View File

@ -121,15 +121,14 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
} else {
List<ListenableFuture<Optional<TsKvEntity>>> futures = new ArrayList<>();
long endPeriod = query.getEndTs();
long startPeriod = query.getStartTs();
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
long step = query.getInterval();
while (startPeriod <= endPeriod) {
while (startPeriod < endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod + 1);
long endTs = Math.min(startPeriod + step, endPeriod);
long ts = startTs + (endTs - startTs) / 2;
ListenableFuture<Optional<TsKvEntity>> aggregateTsKvEntry =
service.submit(() -> findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
ListenableFuture<Optional<TsKvEntity>> aggregateTsKvEntry = findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation());
futures.add(aggregateTsKvEntry);
startPeriod = endTs;
}
@ -152,16 +151,18 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
return new ReadTsKvQueryResult(query.getId(), tsKvEntries, lastTs);
}
Optional<TsKvEntity> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
TsKvEntity entity = switchAggregation(entityId, key, startTs, endTs, aggregation);
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityId.getId());
entity.setStrKey(key);
entity.setTs(ts);
return Optional.of(entity);
} else {
return Optional.empty();
}
ListenableFuture<Optional<TsKvEntity>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
return service.submit(() -> {
TsKvEntity entity = switchAggregation(entityId, key, startTs, endTs, aggregation);
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityId.getId());
entity.setStrKey(key);
entity.setTs(ts);
return Optional.of(entity);
} else {
return Optional.empty();
}
});
}
protected TsKvEntity switchAggregation(EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation) {

View File

@ -39,7 +39,7 @@ public class AggregationRepository {
public static final String FROM_WHERE_CLAUSE = "FROM ts_kv tskv WHERE " +
"tskv.entity_id = cast(:entityId AS uuid) " +
"AND tskv.key= cast(:entityKey AS int) " +
"AND tskv.ts > :startTs AND tskv.ts <= :endTs " +
"AND tskv.ts >= :startTs AND tskv.ts < :endTs " +
"GROUP BY tskv.entity_id, tskv.key, tsBucket " +
"ORDER BY tskv.entity_id, tskv.key, tsBucket";

View File

@ -155,7 +155,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
} else {
long startTs = query.getStartTs();
long endTs = query.getEndTs();
long endTs = Math.max(query.getStartTs() + 1, query.getEndTs());
long timeBucket = query.getInterval();
List<Optional<? extends AbstractTsKvEntity>> data = findAllAndAggregateAsync(entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation());
return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(data));
@ -185,7 +185,18 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
}
private List<Optional<? extends AbstractTsKvEntity>> findAllAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) {
List<TimescaleTsKvEntity> timescaleTsKvEntities = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId());
long interval = endTs - startTs;
long remainingPart = interval % timeBucket;
List<TimescaleTsKvEntity> timescaleTsKvEntities;
if (remainingPart == 0) {
timescaleTsKvEntities = switchAggregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId());
} else {
interval = interval - remainingPart;
timescaleTsKvEntities = new ArrayList<>();
timescaleTsKvEntities.addAll(switchAggregation(key, startTs, startTs + interval, timeBucket, aggregation, entityId.getId()));
timescaleTsKvEntities.addAll(switchAggregation(key, startTs + interval, endTs, remainingPart, aggregation, entityId.getId()));
}
if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) {
List<Optional<? extends AbstractTsKvEntity>> result = new ArrayList<>();
timescaleTsKvEntities.forEach(entity -> {

View File

@ -268,12 +268,12 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
long startPeriod = query.getStartTs();
long endPeriod = query.getEndTs();
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
long step = Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS);
List<ListenableFuture<Optional<TsKvEntryAggWrapper>>> futures = new ArrayList<>();
while (startPeriod <= endPeriod) {
while (startPeriod < endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod + 1);
long endTs = Math.min(startPeriod + step, endPeriod);
long ts = endTs - startTs;
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder());
futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));

View File

@ -213,17 +213,17 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
}
saveEntries(deviceId, TS + 100L + 1L);
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 100, 101, 1, Aggregation.COUNT, DESC_ORDER));
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 100, 100, 1, Aggregation.COUNT, DESC_ORDER));
List<TsKvEntry> entries = tsService.findAll(tenantId, deviceId, queries).get();
Assert.assertEquals(1, entries.size());
Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 11L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 10L)), entries.get(0));
EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY));
entries = tsService.findAll(tenantId, entityView.getId(), queries).get();
Assert.assertEquals(1, entries.size());
Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 11L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 50, new LongDataEntry(LONG_KEY, 10L)), entries.get(0));
}
@Test
@ -240,14 +240,14 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
List<TsKvEntry> entries = tsService.findAll(tenantId, deviceId, queries).get();
Assert.assertEquals(2, entries.size());
Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1));
Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 5L)), entries.get(1));
EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY));
entries = tsService.findAll(tenantId, entityView.getId(), queries).get();
Assert.assertEquals(2, entries.size());
Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1));
Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 5L)), entries.get(1));
}
@Test
@ -264,14 +264,14 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
List<TsKvEntry> entries = tsService.findAll(tenantId, deviceId, queries).get();
Assert.assertEquals(2, entries.size());
Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 4L)), entries.get(1));
Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 3L)), entries.get(1));
EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY));
entries = tsService.findAll(tenantId, entityView.getId(), queries).get();
Assert.assertEquals(2, entries.size());
Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 4L)), entries.get(1));
Assert.assertEquals(toTsEntry(TS + 65000, new LongDataEntry(LONG_KEY, 3L)), entries.get(1));
}
@Test
@ -286,14 +286,14 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
List<TsKvEntry> entries = tsService.findAll(tenantId, deviceId, queries).get();
Assert.assertEquals(2, entries.size());
Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1));
Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 4L)), entries.get(1));
EntityView entityView = saveAndCreateEntityView(deviceId, List.of(LONG_KEY));
entries = tsService.findAll(tenantId, entityView.getId(), queries).get();
Assert.assertEquals(2, entries.size());
Assert.assertEquals(toTsEntry(TS + 25000, new LongDataEntry(LONG_KEY, 5L)), entries.get(0));
Assert.assertEquals(toTsEntry(TS + 75000, new LongDataEntry(LONG_KEY, 5L)), entries.get(1));
Assert.assertEquals(toTsEntry(TS + 75000 - 1, new LongDataEntry(LONG_KEY, 4L)), entries.get(1));
}
@Test

View File

@ -19,6 +19,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
@ -44,13 +46,13 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
final int LIMIT = 1;
final String TEMP = "temp";
final String DESC = "DESC";
AbstractChunkedAggregationTimeseriesDao tsDao;
private AbstractChunkedAggregationTimeseriesDao tsDao;
@Before
public void setUp() throws Exception {
tsDao = spy(AbstractChunkedAggregationTimeseriesDao.class);
Optional<TsKvEntry> optionalListenableFuture = Optional.of(mock(TsKvEntry.class));
willReturn(optionalListenableFuture).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any());
}
@ -58,11 +60,11 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thenLastIntervalShorterThanOthersAndEqualsEndTs() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 2000, LIMIT, COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 2001, 1001, LIMIT, COUNT, DESC);
ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, 3001, 2501, LIMIT, COUNT, DESC);
ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, 3000, 2501, LIMIT, COUNT, DESC);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 2001, getTsForReadTsKvQuery(1, 2001), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 2001, 3000 + 1, getTsForReadTsKvQuery(2001, 3001), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 2001, 3000, getTsForReadTsKvQuery(2001, 3000), COUNT);
}
@Test
@ -72,19 +74,17 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
assertThat(tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query)).isNotNull();
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000 + 1, getTsForReadTsKvQuery(1, 3001), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thenIntervalEqualsPeriodMinusOne() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 2999, LIMIT, COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3000, 1500, LIMIT, COUNT, DESC);
ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 3000, 3001, 3000, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 3000, 3001, getTsForReadTsKvQuery(3000, 3001), COUNT);
}
@ -95,7 +95,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT);
}
@Test
@ -135,7 +135,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT);
}
@Test
@ -145,8 +145,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
for (long i = 1; i <= 3000; i += 3) {
ReadTsKvQuery querySub = new BaseReadTsKvQuery(TEMP, i, i + 3, i + (i + 3 - i) / 2, LIMIT, COUNT, DESC);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, querySub.getKey(), i, i + 3, getTsForReadTsKvQuery(i, i + 3), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, TEMP, i, Math.min(i + 3, 3000), getTsForReadTsKvQuery(i, i + 3), COUNT);
}
}