Merge branch 'feature/grouping-interval-types' of github.com:thingsboard/thingsboard into feature/grouping-interval-types

This commit is contained in:
Igor Kulikov 2024-01-12 15:04:18 +02:00
commit f7a13aca07
14 changed files with 370 additions and 34 deletions

View File

@ -45,6 +45,8 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.kv.AggregationParams;
import org.thingsboard.server.common.data.kv.IntervalType;
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
@ -310,8 +312,12 @@ public class TelemetryController extends BaseController {
@RequestParam(name = "startTs") Long startTs,
@ApiParam(value = "A long value representing the end timestamp of the time range in milliseconds, UTC.")
@RequestParam(name = "endTs") Long endTs,
@ApiParam(value = "A string value representing the type fo the interval.", allowableValues = "MILLISECONDS, WEEK, WEEK_ISO, MONTH, QUARTER")
@RequestParam(name = "intervalType", required = false) IntervalType intervalType,
@ApiParam(value = "A long value representing the aggregation interval range in milliseconds.")
@RequestParam(name = "interval", defaultValue = "0") Long interval,
@ApiParam(value = "A string value representing the timezone that will be used to calculate exact timestamps for 'WEEK', 'WEEK_ISO', 'MONTH' and 'QUARTER' interval types.")
@RequestParam(name = "timeZone", required = false) String timeZone,
@ApiParam(value = "An integer value that represents a max number of timeseries data points to fetch." +
" This parameter is used only in the case if 'agg' parameter is set to 'NONE'.", defaultValue = "100")
@RequestParam(name = "limit", defaultValue = "100") Integer limit,
@ -325,11 +331,16 @@ public class TelemetryController extends BaseController {
@RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
(result, tenantId, entityId) -> {
// If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg, orderBy))
.collect(Collectors.toList());
AggregationParams params;
Aggregation agg = Aggregation.valueOf(aggStr);
if (Aggregation.NONE.equals(agg)) {
params = AggregationParams.none();
} else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) {
params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval);
} else {
params = AggregationParams.calendar(agg, intervalType, timeZone);
}
List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList());
Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
});
}

View File

@ -560,17 +560,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
List<String> keys = cmd.getKeys();
List<ReadTsKvQuery> finalTsKvQueryList;
List<ReadTsKvQuery> tsKvQueryList = keys.stream().map(key -> {
var query = new BaseReadTsKvQuery(
key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg()
);
var query = new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.toAggregationParams(), getLimit(cmd.getLimit()));
queriesKeys.put(query.getId(), query.getKey());
return query;
}).collect(Collectors.toList());
if (cmd.isFetchLatestPreviousPoint()) {
finalTsKvQueryList = new ArrayList<>(tsKvQueryList);
finalTsKvQueryList.addAll(keys.stream().map(key -> {
var query = new BaseReadTsKvQuery(
key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg());
var query = new BaseReadTsKvQuery(key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.toAggregationParams(), 1);
queriesKeys.put(query.getId(), query.getKey());
return query;
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AggregationParams;
import org.thingsboard.server.common.data.kv.IntervalType;
import java.util.List;
@ -40,4 +41,16 @@ public interface GetTsCmd {
boolean isFetchLatestPreviousPoint();
default AggregationParams toAggregationParams() {
var agg = getAgg();
var intervalType = getIntervalType();
if (agg == null || Aggregation.NONE.equals(agg)) {
return AggregationParams.none();
} else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) {
return AggregationParams.milliseconds(agg, getInterval());
} else {
return AggregationParams.calendar(agg, intervalType, getTimeZoneId());
}
}
}

View File

@ -15,12 +15,17 @@
*/
package org.thingsboard.server.controller;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.SingleEntityFilter;
import org.thingsboard.server.common.data.security.DeviceCredentials;
@ -28,6 +33,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.query.EntityKeyType.TIME_SERIES;
@ -51,6 +57,59 @@ public class TelemetryControllerTest extends AbstractControllerTest {
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", invalidRequestBody, String.class, status().isBadRequest());
}
@Test
public void testTelemetryRequests() throws Exception {
loginTenantAdmin();
Device device = createDevice();
var startTs = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
var endOfWeek1Ts = 1705269600000L; // Monday, January 15, 2024 0:00:00 GMT+02:00
var endOfWeek2Ts = 1705874400000L; // Monday, January 22, 2024 0:00:00 GMT+02:00
var endTs = endOfWeek2Ts + TimeUnit.DAYS.toMillis(1) + TimeUnit.HOURS.toMillis(1); // Monday, January 23, 2024 1:00:00 GMT+02:00
var firstIntervalTs = startTs + (endOfWeek1Ts - startTs) / 2;
var secondIntervalTs = endOfWeek1Ts + (endOfWeek2Ts - endOfWeek1Ts) / 2;
var thirdIntervalTs = endOfWeek2Ts + (endTs - endOfWeek2Ts) / 2;
var middleOfTheInterval = startTs + (endTs - startTs) / 2;
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(1704899728000L, new LongDataEntry("t", 1L))); // Wednesday, January 10 15:15:28 GMT
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(1704899729000L, new LongDataEntry("t", 3L))); // Wednesday, January 10 15:15:29 GMT
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(endOfWeek1Ts + 1000, new LongDataEntry("t", 7L))); // Monday, January 15, 2024 0:00:01 GMT+02:00
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(endOfWeek2Ts + 1000, new LongDataEntry("t", 11L))); // Monday, January 22, 2024 0:00:01 GMT+02:00
ObjectNode result = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() +
"/values/timeseries?keys=t&startTs={startTs}&endTs={endTs}&agg={agg}&intervalType={intervalType}&timeZone={timeZone}",
ObjectNode.class, startTs, endTs, "SUM", "WEEK_ISO", "Europe/Kyiv");
Assert.assertNotNull(result);
Assert.assertNotNull(result.get("t"));
Assert.assertEquals(3, result.get("t").size());
var firstIntervalResult = result.get("t").get(0);
Assert.assertEquals(4L, firstIntervalResult.get("value").asLong());
Assert.assertEquals(firstIntervalTs, firstIntervalResult.get("ts").asLong());
var secondIntervalResult = result.get("t").get(1);
Assert.assertEquals(7L, secondIntervalResult.get("value").asLong());
Assert.assertEquals(secondIntervalTs, secondIntervalResult.get("ts").asLong());
var thirdIntervalResult = result.get("t").get(2);
Assert.assertEquals(11L, thirdIntervalResult.get("value").asLong());
Assert.assertEquals(thirdIntervalTs, thirdIntervalResult.get("ts").asLong());
result = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() +
"/values/timeseries?keys=t&startTs={startTs}&endTs={endTs}&agg={agg}&intervalType={intervalType}&timeZone={timeZone}",
ObjectNode.class, startTs, endTs, "SUM", "MONTH", "Europe/Kyiv");
Assert.assertNotNull(result);
Assert.assertNotNull(result.get("t"));
Assert.assertEquals(1, result.get("t").size());
var monthResult = result.get("t").get(0);
Assert.assertEquals(22L, monthResult.get("value").asLong());
Assert.assertEquals(middleOfTheInterval, monthResult.get("ts").asLong());
}
@Test
public void testDeleteAllTelemetryWithLatest() throws Exception {
loginTenantAdmin();

View File

@ -0,0 +1,92 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.kv;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@AllArgsConstructor
@EqualsAndHashCode
@Slf4j
public class AggregationParams {
private static final Map<String, String> TZ_LINKS = Map.of("EST", "America/New_York", "GMT+0", "GMT", "GMT-0", "GMT", "HST", "US/Hawaii", "MST", "America/Phoenix", "ROC", "Asia/Taipei");
@Getter
private final Aggregation aggregation;
@Getter
private final IntervalType intervalType;
@Getter
private final ZoneId tzId;
private final long interval;
public static AggregationParams none() {
return new AggregationParams(Aggregation.NONE, null, null, 0L);
}
public static AggregationParams milliseconds(Aggregation aggregationType, long aggregationIntervalMs) {
return new AggregationParams(aggregationType, IntervalType.MILLISECONDS, null, aggregationIntervalMs);
}
public static AggregationParams calendar(Aggregation aggregationType, IntervalType intervalType, String tzIdStr) {
return calendar(aggregationType, intervalType, getZoneId(tzIdStr));
}
public static AggregationParams calendar(Aggregation aggregationType, IntervalType intervalType, ZoneId tzId) {
return new AggregationParams(aggregationType, intervalType, tzId, 0L);
}
public static AggregationParams of(Aggregation aggregation, IntervalType intervalType, ZoneId tzId, long interval) {
return new AggregationParams(aggregation, intervalType, tzId, interval);
}
public long getInterval() {
if (intervalType == null) {
return 0L;
} else {
switch (intervalType) {
case WEEK:
case WEEK_ISO:
return TimeUnit.DAYS.toMillis(7);
case MONTH:
return TimeUnit.DAYS.toMillis(30);
case QUARTER:
return TimeUnit.DAYS.toMillis(90);
default:
return interval;
}
}
}
private static ZoneId getZoneId(String tzIdStr) {
if (StringUtils.isEmpty(tzIdStr)) {
return ZoneId.systemDefault();
}
try {
return ZoneId.of(tzIdStr, TZ_LINKS);
} catch (DateTimeException e) {
log.warn("[{}] Failed to convert the time zone. Fallback to default.", tzIdStr);
return ZoneId.systemDefault();
}
}
}

View File

@ -18,41 +18,47 @@ package org.thingsboard.server.common.data.kv;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.ZoneId;
@Data
@EqualsAndHashCode(callSuper = true)
public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
private final long interval;
private final AggregationParams aggParameters;
private final int limit;
private final Aggregation aggregation;
private final String order;
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
this(key, startTs, endTs, interval, limit, aggregation, "DESC");
}
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String order) {
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String descOrder) {
this(key, startTs, endTs, AggregationParams.of(aggregation, IntervalType.MILLISECONDS, ZoneId.systemDefault(), interval), limit, descOrder);
}
public BaseReadTsKvQuery(String key, long startTs, long endTs, AggregationParams parameters, int limit) {
this(key, startTs, endTs, parameters, limit, "DESC");
}
public BaseReadTsKvQuery(String key, long startTs, long endTs, AggregationParams parameters, int limit, String order) {
super(key, startTs, endTs);
this.interval = interval;
this.aggParameters = parameters;
this.limit = limit;
this.aggregation = aggregation;
this.order = order;
}
public BaseReadTsKvQuery(String key, long startTs, long endTs) {
this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
this(key, startTs, endTs, AggregationParams.milliseconds(Aggregation.AVG, endTs - startTs), 1, "DESC");
}
public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String order) {
this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, order);
this(key, startTs, endTs, AggregationParams.none(), limit, order);
}
public BaseReadTsKvQuery(ReadTsKvQuery query, long startTs, long endTs) {
super(query.getId(), query.getKey(), startTs, endTs);
this.interval = query.getInterval();
this.aggParameters = query.getAggParameters();
this.limit = query.getLimit();
this.aggregation = query.getAggregation();
this.order = query.getOrder();
}
}

View File

@ -17,6 +17,6 @@ package org.thingsboard.server.common.data.kv;
public enum IntervalType {
MILLISECONDS, WEEK, WEEK_ISO, MONTH, QUARTER
MILLISECONDS, WEEK/*Sunday-Saturday*/, WEEK_ISO/*Monday-Sunday*/, MONTH, QUARTER
}

View File

@ -17,12 +17,18 @@ package org.thingsboard.server.common.data.kv;
public interface ReadTsKvQuery extends TsKvQuery {
long getInterval();
AggregationParams getAggParameters();
default long getInterval(){
return getAggParameters().getInterval();
}
default Aggregation getAggregation() {
return getAggParameters().getAggregation();
}
int getLimit();
Aggregation getAggregation();
String getOrder();
}

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.IntervalType;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -37,9 +38,18 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
import org.thingsboard.server.dao.sqlts.ts.TsKvRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.TimeUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.IsoFields;
import java.time.temporal.TemporalUnit;
import java.time.temporal.WeekFields;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@ -112,16 +122,23 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
@Override
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
var aggParams = query.getAggParameters();
if (Aggregation.NONE.equals(aggParams.getAggregation())) {
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
} else {
List<ListenableFuture<Optional<TsKvEntity>>> futures = new ArrayList<>();
var intervalType = aggParams.getIntervalType();
long startPeriod = query.getStartTs();
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
long step = query.getInterval();
while (startPeriod < endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod);
long endTs;
if (IntervalType.MILLISECONDS.equals(intervalType)) {
endTs = startPeriod + aggParams.getInterval();
} else {
endTs = TimeUtils.calculateIntervalEnd(startTs, intervalType, aggParams.getTzId());
}
endTs = Math.min(endTs, endPeriod);
long ts = startTs + (endTs - startTs) / 2;
ListenableFuture<Optional<TsKvEntity>> aggregateTsKvEntry = findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation());
futures.add(aggregateTsKvEntry);

View File

@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
@ -28,6 +29,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.IntervalType;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -35,11 +37,13 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.TimeUtils;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import javax.annotation.PostConstruct;
@ -143,14 +147,28 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
@Override
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
var aggParams = query.getAggParameters();
var intervalType = aggParams.getIntervalType();
if (query.getAggregation() == Aggregation.NONE) {
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
} else {
} else if (IntervalType.MILLISECONDS.equals(intervalType)) {
long startTs = query.getStartTs();
long endTs = Math.max(query.getStartTs() + 1, query.getEndTs());
long timeBucket = query.getInterval();
List<Optional<? extends AbstractTsKvEntity>> data = findAllAndAggregateAsync(entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation());
return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(data));
} else {
//TODO: @dshvaika improve according to native capabilities of Timescale.
long startPeriod = query.getStartTs();
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
List<TimescaleTsKvEntity> timescaleTsKvEntities = new ArrayList<>();
while (startPeriod < endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(TimeUtils.calculateIntervalEnd(startTs, intervalType, aggParams.getTzId()), endPeriod);
timescaleTsKvEntities.addAll(switchAggregation(query.getKey(), startTs, endTs, endTs - startTs, query.getAggregation(), entityId.getId()));
startPeriod = endTs;
}
return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(toResultList(entityId, query.getKey(), timescaleTsKvEntities)));
}
}
@ -187,6 +205,10 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
timescaleTsKvEntities.addAll(switchAggregation(key, startTs + interval, endTs, remainingPart, aggregation, entityId.getId()));
}
return toResultList(entityId, key, timescaleTsKvEntities);
}
private static List<Optional<? extends AbstractTsKvEntity>> toResultList(EntityId entityId, String key, List<TimescaleTsKvEntity> timescaleTsKvEntities) {
if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) {
List<Optional<? extends AbstractTsKvEntity>> result = new ArrayList<>();
timescaleTsKvEntities.forEach(entity -> {

View File

@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.IntervalType;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
@ -51,6 +52,7 @@ import org.thingsboard.server.dao.nosql.TbResultSet;
import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao;
import org.thingsboard.server.dao.util.NoSqlTsDao;
import org.thingsboard.server.dao.util.TimeUtils;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
@ -227,18 +229,24 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
var aggParams = query.getAggParameters();
if (Aggregation.NONE.equals(aggParams.getAggregation())) {
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
long startPeriod = query.getStartTs();
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
long step = Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS);
List<ListenableFuture<Optional<TsKvEntryAggWrapper>>> futures = new ArrayList<>();
var intervalType = aggParams.getIntervalType();
while (startPeriod < endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod);
long ts = endTs - startTs;
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder());
long endTs;
if (IntervalType.MILLISECONDS.equals(intervalType)) {
endTs = startPeriod + Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS);
} else {
endTs = TimeUtils.calculateIntervalEnd(startTs, aggParams.getIntervalType(), aggParams.getTzId());
}
endTs = Math.min(endTs, endPeriod);
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, query.getAggregation(), query.getOrder());
futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
startPeriod = endTs;
}

View File

@ -33,7 +33,7 @@ public class TsKvQueryCursor extends QueryCursor {
@Getter
private final List<TsKvEntry> data;
@Getter
private String orderBy;
private final String orderBy;
private int partitionIndex;
private int currentLimit;

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.util;
import org.thingsboard.server.common.data.kv.IntervalType;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.IsoFields;
import java.time.temporal.WeekFields;
public class TimeUtils {
public static long calculateIntervalEnd(long startTs, IntervalType intervalType, ZoneId tzId) {
var startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTs), tzId);
switch (intervalType) {
case WEEK:
return startTime.truncatedTo(ChronoUnit.DAYS).with(WeekFields.SUNDAY_START.dayOfWeek(), 1).plusDays(7).toInstant().toEpochMilli();
case WEEK_ISO:
return startTime.truncatedTo(ChronoUnit.DAYS).with(WeekFields.ISO.dayOfWeek(), 1).plusDays(7).toInstant().toEpochMilli();
case MONTH:
return startTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).plusMonths(1).toInstant().toEpochMilli();
case QUARTER:
return startTime.truncatedTo(ChronoUnit.DAYS).with(IsoFields.DAY_OF_QUARTER, 1).plusMonths(3).toInstant().toEpochMilli();
default:
throw new RuntimeException("Not supported!");
}
}
}

View File

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.util;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.kv.IntervalType;
import java.time.ZoneId;
import static org.assertj.core.api.Assertions.assertThat;
class TimeUtilsTest {
@Test
void testWeekEnd() {
long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705183200000L); // Sunday, January 14, 2024 0:00:00 GMT+02:00
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705269600000L); // Monday, January 15, 2024 0:00:00 GMT+02:00
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1705186800000L); // Sunday, January 14, 2024 0:00:00 GMT+01:00
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1705273200000L); // Monday, January 15, 2024 0:00:00 GMT+01:00
ts = 1704621600000L; // Sunday, January 7, 2024 12:00:00 GMT+02:00
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705183200000L); // Sunday, January 14, 2024 0:00:00 GMT+02:00
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Kyiv"))).isEqualTo(1704664800000L); // Monday, January 8, 2024 0:00:00 GMT+02:00
}
@Test
void testMonthEnd() {
long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.MONTH, ZoneId.of("Europe/Kyiv"))).isEqualTo(1706738400000L); // Thursday, February 1, 2024 0:00:00 GMT+02:00
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.MONTH, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1706742000000L); // Monday, January 15, 2024 0:00:00 GMT+02:00
}
@Test
void testQuarterEnd() {
long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Kyiv"))).isEqualTo(1711918800000L); // Monday, April 1, 2024 0:00:00 GMT+03:00 DST
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1711922400000L); // Monday, April 1, 2024 1:00:00 GMT+03:00 DST
ts = 1711929600000L; // Monday, April 1, 2024 3:00:00 GMT+03:00
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Kyiv"))).isEqualTo(1719781200000L); // Monday, July 1, 2024 0:00:00 GMT+03:00 DST
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("America/New_York"))).isEqualTo(1711944000000L); // Monday, April 1, 2024 7:00:00 GMT+03:00 DST
}
}