Merge branch 'rc' of github.com:thingsboard/thingsboard into rc
This commit is contained in:
commit
d72edfaa2f
@ -119,7 +119,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
|
|||||||
@Override
|
@Override
|
||||||
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
||||||
var aggParams = query.getAggParameters();
|
var aggParams = query.getAggParameters();
|
||||||
if (Aggregation.NONE.equals(aggParams.getAggregation())) {
|
if (Aggregation.NONE.equals(aggParams.getAggregation()) || aggParams.getInterval() < 1) {
|
||||||
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
|
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
|
||||||
} else {
|
} else {
|
||||||
List<ListenableFuture<Optional<TsKvEntity>>> futures = new ArrayList<>();
|
List<ListenableFuture<Optional<TsKvEntity>>> futures = new ArrayList<>();
|
||||||
@ -144,7 +144,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
|
ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
|
||||||
Integer keyId = keyDictionaryDao.getOrSaveKeyId(query.getKey());
|
Integer keyId = keyDictionaryDao.getOrSaveKeyId(query.getKey());
|
||||||
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
|
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
|
||||||
entityId.getId(),
|
entityId.getId(),
|
||||||
|
|||||||
@ -295,7 +295,12 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
|
throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
|
||||||
}
|
}
|
||||||
if (!Aggregation.NONE.equals(query.getAggregation())) {
|
if (!Aggregation.NONE.equals(query.getAggregation())) {
|
||||||
long step = Math.max(query.getInterval(), 1000);
|
long interval = query.getInterval();
|
||||||
|
if (interval < 1) {
|
||||||
|
throw new IncorrectParameterException("Invalid TsKvQuery: 'interval' must be greater than 0, but got " + interval +
|
||||||
|
". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'.");
|
||||||
|
}
|
||||||
|
long step = Math.max(interval, 1000);
|
||||||
long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
|
long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
|
||||||
if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
|
if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
|
||||||
throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
|
throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
|
||||||
|
|||||||
@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
|
|||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.data.objects.TelemetryEntityView;
|
import org.thingsboard.server.common.data.objects.TelemetryEntityView;
|
||||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||||
|
import org.thingsboard.server.dao.exception.IncorrectParameterException;
|
||||||
import org.thingsboard.server.dao.service.AbstractServiceTest;
|
import org.thingsboard.server.dao.service.AbstractServiceTest;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
|
|
||||||
@ -757,6 +758,21 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
|||||||
assertThat(fullList).containsOnlyOnceElementsOf(timeseries);
|
assertThat(fullList).containsOnlyOnceElementsOf(timeseries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindAllByQueriesWithAggregationAndZeroInterval() throws Exception {
|
||||||
|
testFindAllByQueriesWithAggregationAndInvalidInterval(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindAllByQueriesWithAggregationAndNegativeInterval() throws Exception {
|
||||||
|
testFindAllByQueriesWithAggregationAndInvalidInterval(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testFindAllByQueriesWithAggregationAndInvalidInterval(long interval) {
|
||||||
|
BaseReadTsKvQuery query = new BaseReadTsKvQuery(STRING_KEY, TS, TS, interval, 1000, Aggregation.SUM, "DESC");
|
||||||
|
Assert.assertThrows(IncorrectParameterException.class, () -> findAndVerifyQueryId(deviceId, query));
|
||||||
|
}
|
||||||
|
|
||||||
private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
|
private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
|
||||||
TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
|
TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
|
||||||
tsService.save(tenantId, deviceId, entry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
|
tsService.save(tenantId, deviceId, entry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
|
||||||
|
|||||||
@ -51,6 +51,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
|
|||||||
Optional<TsKvEntry> optionalListenableFuture = Optional.of(mock(TsKvEntry.class));
|
Optional<TsKvEntry> optionalListenableFuture = Optional.of(mock(TsKvEntry.class));
|
||||||
willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
|
willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
|
||||||
willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any());
|
willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any());
|
||||||
|
willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllAsyncWithLimit(any(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -146,6 +147,24 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenZeroInterval_whenAggregateCount_thenFindAllWithoutAggregation() {
|
||||||
|
givenInterval_whenAggregateCount_thenFindAllWithoutAggregation(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenNegativeInterval_whenAggregateCount_thenFindAllWithoutAggregation() {
|
||||||
|
givenInterval_whenAggregateCount_thenFindAllWithoutAggregation(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void givenInterval_whenAggregateCount_thenFindAllWithoutAggregation(int interval) {
|
||||||
|
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, interval, LIMIT, COUNT, DESC);
|
||||||
|
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
|
||||||
|
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
|
||||||
|
verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any());
|
||||||
|
verify(tsDao, times(0)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
|
||||||
|
}
|
||||||
|
|
||||||
long getTsForReadTsKvQuery(long startTs, long endTs) {
|
long getTsForReadTsKvQuery(long startTs, long endTs) {
|
||||||
return startTs + (endTs - startTs) / 2L;
|
return startTs + (endTs - startTs) / 2L;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user