* refactoring test and code

* fix bug with interval in findAllAsync()
This commit is contained in:
van-vanich 2021-11-19 14:03:30 +02:00
parent 729663e26d
commit 76c7b34c3b
2 changed files with 53 additions and 33 deletions

View File

@ -38,23 +38,24 @@ public interface AggregationTimeseriesDao {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
long step = getIntervalGreaterOrEqualsMinAggregationStep(query.getInterval());
long stepTs = query.getStartTs();
List<ListenableFuture<Optional<TsKvEntry>>> futures = findIntervals(tenantId, entityId, query, step, stepTs);
List<ListenableFuture<Optional<TsKvEntry>>> futures = findIntervals(tenantId, entityId, query);
return getTskvEntriesFuture(Futures.allAsList(futures));
}
}
default List<ListenableFuture<Optional<TsKvEntry>>> findIntervals(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, long step, long stepTs) {
default List<ListenableFuture<Optional<TsKvEntry>>> findIntervals(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + step;
long endPeriod = query.getEndTs();
long startPeriod = query.getStartTs();
long step = query.getInterval();
while (startPeriod <= endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod + 1);
long ts = getTsForReadTsKvQuery(startTs, endTs);
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder());
ListenableFuture<Optional<TsKvEntry>> aggregateTsKvEntry = findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs), query.getAggregation());
futures.add(aggregateTsKvEntry);
stepTs = endTs;
startPeriod = endTs;
}
return futures;
}

View File

@ -39,91 +39,109 @@ import static org.mockito.Mockito.verify;
public class AbstractChunkedAggregationTimeseriesDaoTest {
final int START_TS = 1;
final int LIMIT = 0;
final int LIMIT = 1;
final int END_TS = 3000;
final String TEMP = "temp";
final String DESC = "DESC";
AbstractChunkedAggregationTimeseriesDao tsDao;
//SOME PRESENT: When we give data with period l-r, program return data in interval [l;r)
@Before
public void setUp() throws Exception {
tsDao = mock(AbstractChunkedAggregationTimeseriesDao.class);
ListenableFuture<Optional<TsKvEntry>> optionalListenableFuture = Futures.immediateFuture(Optional.of(mock(TsKvEntry.class)));
willReturn(optionalListenableFuture).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
willReturn(Futures.immediateFuture(null)).given(tsDao).getTskvEntriesFuture(any());
willCallRealMethod().given(tsDao).findAllAsync(any(), any(), any(ReadTsKvQuery.class));
willCallRealMethod().given(tsDao).findIntervals(any(), any(), any(ReadTsKvQuery.class));
willCallRealMethod().given(tsDao).getTsForReadTsKvQuery(anyLong(), anyLong());
willCallRealMethod().given(tsDao).toPartitionTs(anyLong());
willCallRealMethod().given(tsDao).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanLastIntervalShorterThanOthersAndEqualsEndTs() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, 2000, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, 2001, START_TS + (2001 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, END_TS, 2001 + (END_TS + 1 - 2001) / 2, LIMIT, Aggregation.COUNT, DESC);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(2)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 1, 2000, 1000, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 2001, END_TS, 2500, Aggregation.COUNT);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, 2001, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQuerySecond, 2001, END_TS + 1, Aggregation.COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsPeriod() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, END_TS, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, START_TS + (END_TS + 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 1, END_TS, 1500, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS + 1, Aggregation.COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsPeriodMinusOne() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, 2999, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS - 2, START_TS + (END_TS - 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, END_TS - 1, END_TS, END_TS - 1 + (END_TS + 1 - (END_TS - 1)) / 2, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(2)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 0, 2999, 1499, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, END_TS, END_TS, END_TS, Aggregation.COUNT);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQuerySecond, END_TS, END_TS + 1, Aggregation.COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsPeriodPlusOne() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, 3001, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, START_TS + (3001 + 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, START_TS, END_TS, 1501, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS + 1, Aggregation.COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsOneMillisecondAndStartTsIsZero() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, 0, 1, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 0, 0, 1, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 0, 0, 0, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 0, 0, 0, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, 0, 0 + 1, Aggregation.COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsOneMillisecondAndStartTsIsOne() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, 1, 1, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, START_TS, 1, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, 1, 1, 1, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query, START_TS, START_TS + 1, Aggregation.COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsOneMillisecondAndStartTsIsIntegerMax() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE, 1, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, Integer.MAX_VALUE + (Integer.MAX_VALUE + 1L - Integer.MAX_VALUE) / 2L, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, Aggregation.COUNT);
}
@Test
public void givenIntervalNotMultiplePeriod_whenAggregateCount_thanIntervalEqualsBigNumber() {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, Integer.MAX_VALUE, LIMIT, Aggregation.COUNT, DESC);
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, START_TS + (END_TS + 1 - START_TS) / 2, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, START_TS, END_TS, 1500, Aggregation.COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, subQueryFirst, START_TS, END_TS + 1, Aggregation.COUNT);
}
@Test
@ -132,9 +150,10 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, START_TS, END_TS, intervalTs, LIMIT, Aggregation.COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
tsDao.findAllAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, query);
verify(tsDao, times(1000)).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
for (long i = START_TS; i < END_TS; i += intervalTs) {
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TEMP, i, i + intervalTs, i, Aggregation.COUNT);
verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
for (long i = START_TS; i <= END_TS; i += intervalTs) {
ReadTsKvQuery querySub = new BaseReadTsKvQuery(TEMP, i, i + intervalTs, i + (i + intervalTs - i) / 2, LIMIT, Aggregation.COUNT, DESC);
verify(tsDao, times(1)).findAndAggregateAsync(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, querySub, i, i + intervalTs, Aggregation.COUNT);
}
}
}