Interval types back-end implementation
This commit is contained in:
parent
e3041adc42
commit
7fc2231189
@ -45,6 +45,8 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.kv.AggregationParams;
|
||||
import org.thingsboard.server.common.data.kv.IntervalType;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
@ -310,8 +312,12 @@ public class TelemetryController extends BaseController {
|
||||
@RequestParam(name = "startTs") Long startTs,
|
||||
@ApiParam(value = "A long value representing the end timestamp of the time range in milliseconds, UTC.")
|
||||
@RequestParam(name = "endTs") Long endTs,
|
||||
@ApiParam(value = "A string value representing the type fo the interval.", allowableValues = "MILLISECONDS, WEEK, WEEK_ISO, MONTH, QUARTER")
|
||||
@RequestParam(name = "intervalType", required = false) IntervalType intervalType,
|
||||
@ApiParam(value = "A long value representing the aggregation interval range in milliseconds.")
|
||||
@RequestParam(name = "interval", defaultValue = "0") Long interval,
|
||||
@ApiParam(value = "A string value representing the timezone that will be used to calculate exact timestamps for 'WEEK', 'WEEK_ISO', 'MONTH' and 'QUARTER' interval types.")
|
||||
@RequestParam(name = "timeZone", required = false) String timeZone,
|
||||
@ApiParam(value = "An integer value that represents a max number of timeseries data points to fetch." +
|
||||
" This parameter is used only in the case if 'agg' parameter is set to 'NONE'.", defaultValue = "100")
|
||||
@RequestParam(name = "limit", defaultValue = "100") Integer limit,
|
||||
@ -325,11 +331,16 @@ public class TelemetryController extends BaseController {
|
||||
@RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
|
||||
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
|
||||
(result, tenantId, entityId) -> {
|
||||
// If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
|
||||
Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
|
||||
List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg, orderBy))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
AggregationParams params;
|
||||
Aggregation agg = Aggregation.valueOf(aggStr);
|
||||
if (Aggregation.NONE.equals(agg)) {
|
||||
params = AggregationParams.none();
|
||||
} else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) {
|
||||
params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval);
|
||||
} else {
|
||||
params = AggregationParams.calendar(agg, intervalType, timeZone);
|
||||
}
|
||||
List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList());
|
||||
Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
|
||||
});
|
||||
}
|
||||
|
||||
@ -560,17 +560,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
List<String> keys = cmd.getKeys();
|
||||
List<ReadTsKvQuery> finalTsKvQueryList;
|
||||
List<ReadTsKvQuery> tsKvQueryList = keys.stream().map(key -> {
|
||||
var query = new BaseReadTsKvQuery(
|
||||
key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg()
|
||||
);
|
||||
var query = new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.toAggregationParams(), getLimit(cmd.getLimit()));
|
||||
queriesKeys.put(query.getId(), query.getKey());
|
||||
return query;
|
||||
}).collect(Collectors.toList());
|
||||
if (cmd.isFetchLatestPreviousPoint()) {
|
||||
finalTsKvQueryList = new ArrayList<>(tsKvQueryList);
|
||||
finalTsKvQueryList.addAll(keys.stream().map(key -> {
|
||||
var query = new BaseReadTsKvQuery(
|
||||
key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg());
|
||||
var query = new BaseReadTsKvQuery(key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.toAggregationParams(), 1);
|
||||
queriesKeys.put(query.getId(), query.getKey());
|
||||
return query;
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
|
||||
|
||||
import org.thingsboard.server.common.data.kv.Aggregation;
|
||||
import org.thingsboard.server.common.data.kv.AggregationParams;
|
||||
import org.thingsboard.server.common.data.kv.IntervalType;
|
||||
|
||||
import java.util.List;
|
||||
@ -40,4 +41,16 @@ public interface GetTsCmd {
|
||||
|
||||
boolean isFetchLatestPreviousPoint();
|
||||
|
||||
default AggregationParams toAggregationParams() {
|
||||
var agg = getAgg();
|
||||
var intervalType = getIntervalType();
|
||||
if (agg == null || Aggregation.NONE.equals(agg)) {
|
||||
return AggregationParams.none();
|
||||
} else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) {
|
||||
return AggregationParams.milliseconds(agg, getInterval());
|
||||
} else {
|
||||
return AggregationParams.calendar(agg, intervalType, getTimeZoneId());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -15,12 +15,17 @@
|
||||
*/
|
||||
package org.thingsboard.server.controller;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.query.EntityKey;
|
||||
import org.thingsboard.server.common.data.query.SingleEntityFilter;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
@ -28,6 +33,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
import static org.thingsboard.server.common.data.query.EntityKeyType.TIME_SERIES;
|
||||
@ -51,6 +57,59 @@ public class TelemetryControllerTest extends AbstractControllerTest {
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", invalidRequestBody, String.class, status().isBadRequest());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTelemetryRequests() throws Exception {
|
||||
loginTenantAdmin();
|
||||
Device device = createDevice();
|
||||
|
||||
var startTs = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
|
||||
var endOfWeek1Ts = 1705269600000L; // Monday, January 15, 2024 0:00:00 GMT+02:00
|
||||
var endOfWeek2Ts = 1705874400000L; // Monday, January 22, 2024 0:00:00 GMT+02:00
|
||||
var endTs = endOfWeek2Ts + TimeUnit.DAYS.toMillis(1) + TimeUnit.HOURS.toMillis(1); // Monday, January 23, 2024 1:00:00 GMT+02:00
|
||||
|
||||
var firstIntervalTs = startTs + (endOfWeek1Ts - startTs) / 2;
|
||||
var secondIntervalTs = endOfWeek1Ts + (endOfWeek2Ts - endOfWeek1Ts) / 2;
|
||||
var thirdIntervalTs = endOfWeek2Ts + (endTs - endOfWeek2Ts) / 2;
|
||||
|
||||
var middleOfTheInterval = startTs + (endTs - startTs) / 2;
|
||||
|
||||
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(1704899728000L, new LongDataEntry("t", 1L))); // Wednesday, January 10 15:15:28 GMT
|
||||
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(1704899729000L, new LongDataEntry("t", 3L))); // Wednesday, January 10 15:15:29 GMT
|
||||
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(endOfWeek1Ts + 1000, new LongDataEntry("t", 7L))); // Monday, January 15, 2024 0:00:01 GMT+02:00
|
||||
tsService.save(tenantId, device.getId(), new BasicTsKvEntry(endOfWeek2Ts + 1000, new LongDataEntry("t", 11L))); // Monday, January 22, 2024 0:00:01 GMT+02:00
|
||||
|
||||
ObjectNode result = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() +
|
||||
"/values/timeseries?keys=t&startTs={startTs}&endTs={endTs}&agg={agg}&intervalType={intervalType}&timeZone={timeZone}",
|
||||
ObjectNode.class, startTs, endTs, "SUM", "WEEK_ISO", "Europe/Kyiv");
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertNotNull(result.get("t"));
|
||||
Assert.assertEquals(3, result.get("t").size());
|
||||
|
||||
var firstIntervalResult = result.get("t").get(0);
|
||||
Assert.assertEquals(4L, firstIntervalResult.get("value").asLong());
|
||||
Assert.assertEquals(firstIntervalTs, firstIntervalResult.get("ts").asLong());
|
||||
|
||||
var secondIntervalResult = result.get("t").get(1);
|
||||
Assert.assertEquals(7L, secondIntervalResult.get("value").asLong());
|
||||
Assert.assertEquals(secondIntervalTs, secondIntervalResult.get("ts").asLong());
|
||||
|
||||
var thirdIntervalResult = result.get("t").get(2);
|
||||
Assert.assertEquals(11L, thirdIntervalResult.get("value").asLong());
|
||||
Assert.assertEquals(thirdIntervalTs, thirdIntervalResult.get("ts").asLong());
|
||||
|
||||
result = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() +
|
||||
"/values/timeseries?keys=t&startTs={startTs}&endTs={endTs}&agg={agg}&intervalType={intervalType}&timeZone={timeZone}",
|
||||
ObjectNode.class, startTs, endTs, "SUM", "MONTH", "Europe/Kyiv");
|
||||
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertNotNull(result.get("t"));
|
||||
Assert.assertEquals(1, result.get("t").size());
|
||||
|
||||
var monthResult = result.get("t").get(0);
|
||||
Assert.assertEquals(22L, monthResult.get("value").asLong());
|
||||
Assert.assertEquals(middleOfTheInterval, monthResult.get("ts").asLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteAllTelemetryWithLatest() throws Exception {
|
||||
loginTenantAdmin();
|
||||
|
||||
@ -0,0 +1,92 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.kv;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
|
||||
import java.time.DateTimeException;
|
||||
import java.time.ZoneId;
|
||||
import java.time.zone.ZoneRulesException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
@Slf4j
|
||||
public class AggregationParams {
|
||||
@Getter
|
||||
private final Aggregation aggregation;
|
||||
@Getter
|
||||
private final IntervalType intervalType;
|
||||
@Getter
|
||||
private final ZoneId tzId;
|
||||
|
||||
private final long interval;
|
||||
|
||||
public static AggregationParams none() {
|
||||
return new AggregationParams(Aggregation.NONE, null, null, 0L);
|
||||
}
|
||||
|
||||
public static AggregationParams milliseconds(Aggregation aggregationType, long aggregationIntervalMs) {
|
||||
return new AggregationParams(aggregationType, IntervalType.MILLISECONDS, null, aggregationIntervalMs);
|
||||
}
|
||||
|
||||
public static AggregationParams calendar(Aggregation aggregationType, IntervalType intervalType, String tzIdStr) {
|
||||
return calendar(aggregationType, intervalType, getZoneId(tzIdStr));
|
||||
}
|
||||
|
||||
public static AggregationParams calendar(Aggregation aggregationType, IntervalType intervalType, ZoneId tzId) {
|
||||
return new AggregationParams(aggregationType, intervalType, tzId, 0L);
|
||||
}
|
||||
|
||||
public static AggregationParams of(Aggregation aggregation, IntervalType intervalType, ZoneId tzId, long interval) {
|
||||
return new AggregationParams(aggregation, intervalType, tzId, interval);
|
||||
}
|
||||
|
||||
public long getInterval() {
|
||||
if (intervalType == null) {
|
||||
return 0L;
|
||||
} else {
|
||||
switch (intervalType) {
|
||||
case WEEK:
|
||||
case WEEK_ISO:
|
||||
return TimeUnit.DAYS.toMillis(7);
|
||||
case MONTH:
|
||||
return TimeUnit.DAYS.toMillis(30);
|
||||
case QUARTER:
|
||||
return TimeUnit.DAYS.toMillis(90);
|
||||
default:
|
||||
return interval;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static ZoneId getZoneId(String tzIdStr) {
|
||||
if (StringUtils.isEmpty(tzIdStr)) {
|
||||
return ZoneId.systemDefault();
|
||||
}
|
||||
try {
|
||||
return ZoneId.of(tzIdStr);
|
||||
} catch (DateTimeException e) {
|
||||
log.warn("[{}] Failed to convert the time zone. Fallback to default.", tzIdStr);
|
||||
return ZoneId.systemDefault();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -18,41 +18,47 @@ package org.thingsboard.server.common.data.kv;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.time.ZoneId;
|
||||
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
|
||||
|
||||
private final long interval;
|
||||
private final AggregationParams aggParameters;
|
||||
private final int limit;
|
||||
private final Aggregation aggregation;
|
||||
private final String order;
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
|
||||
this(key, startTs, endTs, interval, limit, aggregation, "DESC");
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String order) {
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String descOrder) {
|
||||
this(key, startTs, endTs, AggregationParams.of(aggregation, IntervalType.MILLISECONDS, ZoneId.systemDefault(), interval), limit, descOrder);
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, AggregationParams parameters, int limit) {
|
||||
this(key, startTs, endTs, parameters, limit, "DESC");
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, AggregationParams parameters, int limit, String order) {
|
||||
super(key, startTs, endTs);
|
||||
this.interval = interval;
|
||||
this.aggParameters = parameters;
|
||||
this.limit = limit;
|
||||
this.aggregation = aggregation;
|
||||
this.order = order;
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs) {
|
||||
this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
|
||||
this(key, startTs, endTs, AggregationParams.milliseconds(Aggregation.AVG, endTs - startTs), 1, "DESC");
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String order) {
|
||||
this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, order);
|
||||
this(key, startTs, endTs, AggregationParams.none(), limit, order);
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(ReadTsKvQuery query, long startTs, long endTs) {
|
||||
super(query.getId(), query.getKey(), startTs, endTs);
|
||||
this.interval = query.getInterval();
|
||||
this.aggParameters = query.getAggParameters();
|
||||
this.limit = query.getLimit();
|
||||
this.aggregation = query.getAggregation();
|
||||
this.order = query.getOrder();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -17,6 +17,6 @@ package org.thingsboard.server.common.data.kv;
|
||||
|
||||
public enum IntervalType {
|
||||
|
||||
MILLISECONDS, WEEK, WEEK_ISO, MONTH, QUARTER
|
||||
MILLISECONDS, WEEK/*Sunday-Saturday*/, WEEK_ISO/*Monday-Sunday*/, MONTH, QUARTER
|
||||
|
||||
}
|
||||
|
||||
@ -17,12 +17,18 @@ package org.thingsboard.server.common.data.kv;
|
||||
|
||||
public interface ReadTsKvQuery extends TsKvQuery {
|
||||
|
||||
long getInterval();
|
||||
AggregationParams getAggParameters();
|
||||
|
||||
default long getInterval(){
|
||||
return getAggParameters().getInterval();
|
||||
}
|
||||
|
||||
default Aggregation getAggregation() {
|
||||
return getAggParameters().getAggregation();
|
||||
}
|
||||
|
||||
int getLimit();
|
||||
|
||||
Aggregation getAggregation();
|
||||
|
||||
String getOrder();
|
||||
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.Aggregation;
|
||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.IntervalType;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
@ -37,9 +38,18 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
||||
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
|
||||
import org.thingsboard.server.dao.sqlts.ts.TsKvRepository;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
|
||||
import org.thingsboard.server.dao.util.TimeUtils;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.temporal.IsoFields;
|
||||
import java.time.temporal.TemporalUnit;
|
||||
import java.time.temporal.WeekFields;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
@ -112,16 +122,23 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
|
||||
|
||||
@Override
|
||||
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
||||
if (query.getAggregation() == Aggregation.NONE) {
|
||||
var aggParams = query.getAggParameters();
|
||||
if (Aggregation.NONE.equals(aggParams.getAggregation())) {
|
||||
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
|
||||
} else {
|
||||
List<ListenableFuture<Optional<TsKvEntity>>> futures = new ArrayList<>();
|
||||
var intervalType = aggParams.getIntervalType();
|
||||
long startPeriod = query.getStartTs();
|
||||
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
|
||||
long step = query.getInterval();
|
||||
while (startPeriod < endPeriod) {
|
||||
long startTs = startPeriod;
|
||||
long endTs = Math.min(startPeriod + step, endPeriod);
|
||||
long endTs;
|
||||
if (IntervalType.MILLISECONDS.equals(intervalType)) {
|
||||
endTs = startPeriod + aggParams.getInterval();
|
||||
} else {
|
||||
endTs = TimeUtils.calculateIntervalEnd(startTs, intervalType, aggParams.getTzId());
|
||||
}
|
||||
endTs = Math.min(endTs, endPeriod);
|
||||
long ts = startTs + (endTs - startTs) / 2;
|
||||
ListenableFuture<Optional<TsKvEntity>> aggregateTsKvEntry = findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation());
|
||||
futures.add(aggregateTsKvEntry);
|
||||
|
||||
@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Sort;
|
||||
@ -28,6 +29,7 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.Aggregation;
|
||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.IntervalType;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
@ -35,11 +37,13 @@ import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
|
||||
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
|
||||
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
||||
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
|
||||
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
|
||||
import org.thingsboard.server.dao.util.TimeUtils;
|
||||
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
@ -143,14 +147,28 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
|
||||
|
||||
@Override
|
||||
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
||||
var aggParams = query.getAggParameters();
|
||||
var intervalType = aggParams.getIntervalType();
|
||||
if (query.getAggregation() == Aggregation.NONE) {
|
||||
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
|
||||
} else {
|
||||
} else if (IntervalType.MILLISECONDS.equals(intervalType)) {
|
||||
long startTs = query.getStartTs();
|
||||
long endTs = Math.max(query.getStartTs() + 1, query.getEndTs());
|
||||
long timeBucket = query.getInterval();
|
||||
List<Optional<? extends AbstractTsKvEntity>> data = findAllAndAggregateAsync(entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation());
|
||||
return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(data));
|
||||
} else {
|
||||
//TODO: @dshvaika improve according to native capabilities of Timescale.
|
||||
long startPeriod = query.getStartTs();
|
||||
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
|
||||
List<TimescaleTsKvEntity> timescaleTsKvEntities = new ArrayList<>();
|
||||
while (startPeriod < endPeriod) {
|
||||
long startTs = startPeriod;
|
||||
long endTs = Math.min(TimeUtils.calculateIntervalEnd(startTs, intervalType, aggParams.getTzId()), endPeriod);
|
||||
timescaleTsKvEntities.addAll(switchAggregation(query.getKey(), startTs, endTs, endTs - startTs, query.getAggregation(), entityId.getId()));
|
||||
startPeriod = endTs;
|
||||
}
|
||||
return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(toResultList(entityId, query.getKey(), timescaleTsKvEntities)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,6 +205,10 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
|
||||
timescaleTsKvEntities.addAll(switchAggregation(key, startTs + interval, endTs, remainingPart, aggregation, entityId.getId()));
|
||||
}
|
||||
|
||||
return toResultList(entityId, key, timescaleTsKvEntities);
|
||||
}
|
||||
|
||||
private static List<Optional<? extends AbstractTsKvEntity>> toResultList(EntityId entityId, String key, List<TimescaleTsKvEntity> timescaleTsKvEntities) {
|
||||
if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) {
|
||||
List<Optional<? extends AbstractTsKvEntity>> result = new ArrayList<>();
|
||||
timescaleTsKvEntities.forEach(entity -> {
|
||||
|
||||
@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
|
||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.DataType;
|
||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.IntervalType;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
||||
@ -51,6 +52,7 @@ import org.thingsboard.server.dao.nosql.TbResultSet;
|
||||
import org.thingsboard.server.dao.nosql.TbResultSetFuture;
|
||||
import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao;
|
||||
import org.thingsboard.server.dao.util.NoSqlTsDao;
|
||||
import org.thingsboard.server.dao.util.TimeUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.PostConstruct;
|
||||
@ -227,18 +229,24 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
|
||||
|
||||
@Override
|
||||
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
||||
if (query.getAggregation() == Aggregation.NONE) {
|
||||
var aggParams = query.getAggParameters();
|
||||
if (Aggregation.NONE.equals(aggParams.getAggregation())) {
|
||||
return findAllAsyncWithLimit(tenantId, entityId, query);
|
||||
} else {
|
||||
long startPeriod = query.getStartTs();
|
||||
long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs());
|
||||
long step = Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS);
|
||||
List<ListenableFuture<Optional<TsKvEntryAggWrapper>>> futures = new ArrayList<>();
|
||||
var intervalType = aggParams.getIntervalType();
|
||||
while (startPeriod < endPeriod) {
|
||||
long startTs = startPeriod;
|
||||
long endTs = Math.min(startPeriod + step, endPeriod);
|
||||
long ts = endTs - startTs;
|
||||
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder());
|
||||
long endTs;
|
||||
if (IntervalType.MILLISECONDS.equals(intervalType)) {
|
||||
endTs = startPeriod + Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS);
|
||||
} else {
|
||||
endTs = TimeUtils.calculateIntervalEnd(startTs, aggParams.getIntervalType(), aggParams.getTzId());
|
||||
}
|
||||
endTs = Math.min(endTs, endPeriod);
|
||||
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, query.getAggregation(), query.getOrder());
|
||||
futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
|
||||
startPeriod = endTs;
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ public class TsKvQueryCursor extends QueryCursor {
|
||||
@Getter
|
||||
private final List<TsKvEntry> data;
|
||||
@Getter
|
||||
private String orderBy;
|
||||
private final String orderBy;
|
||||
|
||||
private int partitionIndex;
|
||||
private int currentLimit;
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.util;
|
||||
|
||||
import org.thingsboard.server.common.data.kv.IntervalType;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.temporal.IsoFields;
|
||||
import java.time.temporal.WeekFields;
|
||||
|
||||
public class TimeUtils {
|
||||
|
||||
public static long calculateIntervalEnd(long startTs, IntervalType intervalType, ZoneId tzId) {
|
||||
var startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTs), tzId);
|
||||
switch (intervalType) {
|
||||
case WEEK:
|
||||
return startTime.truncatedTo(ChronoUnit.DAYS).with(WeekFields.SUNDAY_START.dayOfWeek(), 1).plusDays(7).toInstant().toEpochMilli();
|
||||
case WEEK_ISO:
|
||||
return startTime.truncatedTo(ChronoUnit.DAYS).with(WeekFields.ISO.dayOfWeek(), 1).plusDays(7).toInstant().toEpochMilli();
|
||||
case MONTH:
|
||||
return startTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).plusMonths(1).toInstant().toEpochMilli();
|
||||
case QUARTER:
|
||||
return startTime.truncatedTo(ChronoUnit.DAYS).with(IsoFields.DAY_OF_QUARTER, 1).plusMonths(3).toInstant().toEpochMilli();
|
||||
default:
|
||||
throw new RuntimeException("Not supported!");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.util;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.thingsboard.server.common.data.kv.IntervalType;
|
||||
|
||||
import java.time.ZoneId;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TimeUtilsTest {
|
||||
|
||||
@Test
|
||||
void testWeekEnd() {
|
||||
long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705183200000L); // Sunday, January 14, 2024 0:00:00 GMT+02:00
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705269600000L); // Monday, January 15, 2024 0:00:00 GMT+02:00
|
||||
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1705186800000L); // Sunday, January 14, 2024 0:00:00 GMT+01:00
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1705273200000L); // Monday, January 15, 2024 0:00:00 GMT+01:00
|
||||
|
||||
ts = 1704621600000L; // Sunday, January 7, 2024 12:00:00 GMT+02:00
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705183200000L); // Sunday, January 14, 2024 0:00:00 GMT+02:00
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Kyiv"))).isEqualTo(1704664800000L); // Monday, January 8, 2024 0:00:00 GMT+02:00
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void testMonthEnd() {
|
||||
long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.MONTH, ZoneId.of("Europe/Kyiv"))).isEqualTo(1706738400000L); // Thursday, February 1, 2024 0:00:00 GMT+02:00
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.MONTH, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1706742000000L); // Monday, January 15, 2024 0:00:00 GMT+02:00
|
||||
}
|
||||
|
||||
@Test
|
||||
void testQuarterEnd() {
|
||||
long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Kyiv"))).isEqualTo(1711918800000L); // Monday, April 1, 2024 0:00:00 GMT+03:00 DST
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1711922400000L); // Monday, April 1, 2024 1:00:00 GMT+03:00 DST
|
||||
|
||||
ts = 1711929600000L; // Monday, April 1, 2024 3:00:00 GMT+03:00
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Kyiv"))).isEqualTo(1719781200000L); // Monday, July 1, 2024 0:00:00 GMT+03:00 DST
|
||||
assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("America/New_York"))).isEqualTo(1711944000000L); // Monday, April 1, 2024 7:00:00 GMT+03:00 DST
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user