diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 295b8cad99..288828f046 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -45,6 +45,8 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.kv.AggregationParams; +import org.thingsboard.server.common.data.kv.IntervalType; import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; @@ -310,8 +312,12 @@ public class TelemetryController extends BaseController { @RequestParam(name = "startTs") Long startTs, @ApiParam(value = "A long value representing the end timestamp of the time range in milliseconds, UTC.") @RequestParam(name = "endTs") Long endTs, + @ApiParam(value = "A string value representing the type fo the interval.", allowableValues = "MILLISECONDS, WEEK, WEEK_ISO, MONTH, QUARTER") + @RequestParam(name = "intervalType", required = false) IntervalType intervalType, @ApiParam(value = "A long value representing the aggregation interval range in milliseconds.") @RequestParam(name = "interval", defaultValue = "0") Long interval, + @ApiParam(value = "A string value representing the timezone that will be used to calculate exact timestamps for 'WEEK', 'WEEK_ISO', 'MONTH' and 'QUARTER' interval types.") + @RequestParam(name = "timeZone", required = false) String timeZone, @ApiParam(value = "An integer value that represents a max number of timeseries data points to fetch." + " This parameter is used only in the case if 'agg' parameter is set to 'NONE'.", defaultValue = "100") @RequestParam(name = "limit", defaultValue = "100") Integer limit, @@ -325,11 +331,16 @@ public class TelemetryController extends BaseController { @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException { return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, (result, tenantId, entityId) -> { - // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted - Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr); - List queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg, orderBy)) - .collect(Collectors.toList()); - + AggregationParams params; + Aggregation agg = Aggregation.valueOf(aggStr); + if (Aggregation.NONE.equals(agg)) { + params = AggregationParams.none(); + } else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) { + params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval); + } else { + params = AggregationParams.calendar(agg, intervalType, timeZone); + } + List queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList()); Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor()); }); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 5251ee72f6..6e31a24275 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -560,17 +560,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc List keys = cmd.getKeys(); List finalTsKvQueryList; List tsKvQueryList = keys.stream().map(key -> { - var query = new BaseReadTsKvQuery( - key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg() - ); + var query = new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.toAggregationParams(), getLimit(cmd.getLimit())); queriesKeys.put(query.getId(), query.getKey()); return query; }).collect(Collectors.toList()); if (cmd.isFetchLatestPreviousPoint()) { finalTsKvQueryList = new ArrayList<>(tsKvQueryList); finalTsKvQueryList.addAll(keys.stream().map(key -> { - var query = new BaseReadTsKvQuery( - key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()); + var query = new BaseReadTsKvQuery(key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.toAggregationParams(), 1); queriesKeys.put(query.getId(), query.getKey()); return query; } diff --git a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/GetTsCmd.java b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/GetTsCmd.java index 80a749fbe7..ef5d9efb96 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/GetTsCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/telemetry/cmd/v2/GetTsCmd.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.ws.telemetry.cmd.v2; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.AggregationParams; import org.thingsboard.server.common.data.kv.IntervalType; import java.util.List; @@ -40,4 +41,16 @@ public interface GetTsCmd { boolean isFetchLatestPreviousPoint(); + default AggregationParams toAggregationParams() { + var agg = getAgg(); + var intervalType = getIntervalType(); + if (agg == null || Aggregation.NONE.equals(agg)) { + return AggregationParams.none(); + } else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) { + return AggregationParams.milliseconds(agg, getInterval()); + } else { + return AggregationParams.calendar(agg, intervalType, getTimeZoneId()); + } + } + } diff --git a/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java index 8061cf0eab..98fbffaee5 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java @@ -15,12 +15,17 @@ */ package org.thingsboard.server.controller; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.Assert; import org.junit.Test; import org.springframework.test.context.TestPropertySource; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.SingleEntityFilter; import org.thingsboard.server.common.data.security.DeviceCredentials; @@ -28,6 +33,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.dao.service.DaoSqlTest; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.server.common.data.query.EntityKeyType.TIME_SERIES; @@ -51,6 +57,59 @@ public class TelemetryControllerTest extends AbstractControllerTest { doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", invalidRequestBody, String.class, status().isBadRequest()); } + @Test + public void testTelemetryRequests() throws Exception { + loginTenantAdmin(); + Device device = createDevice(); + + var startTs = 1704899727000L; // Wednesday, January 10 15:15:27 GMT + var endOfWeek1Ts = 1705269600000L; // Monday, January 15, 2024 0:00:00 GMT+02:00 + var endOfWeek2Ts = 1705874400000L; // Monday, January 22, 2024 0:00:00 GMT+02:00 + var endTs = endOfWeek2Ts + TimeUnit.DAYS.toMillis(1) + TimeUnit.HOURS.toMillis(1); // Monday, January 23, 2024 1:00:00 GMT+02:00 + + var firstIntervalTs = startTs + (endOfWeek1Ts - startTs) / 2; + var secondIntervalTs = endOfWeek1Ts + (endOfWeek2Ts - endOfWeek1Ts) / 2; + var thirdIntervalTs = endOfWeek2Ts + (endTs - endOfWeek2Ts) / 2; + + var middleOfTheInterval = startTs + (endTs - startTs) / 2; + + tsService.save(tenantId, device.getId(), new BasicTsKvEntry(1704899728000L, new LongDataEntry("t", 1L))); // Wednesday, January 10 15:15:28 GMT + tsService.save(tenantId, device.getId(), new BasicTsKvEntry(1704899729000L, new LongDataEntry("t", 3L))); // Wednesday, January 10 15:15:29 GMT + tsService.save(tenantId, device.getId(), new BasicTsKvEntry(endOfWeek1Ts + 1000, new LongDataEntry("t", 7L))); // Monday, January 15, 2024 0:00:01 GMT+02:00 + tsService.save(tenantId, device.getId(), new BasicTsKvEntry(endOfWeek2Ts + 1000, new LongDataEntry("t", 11L))); // Monday, January 22, 2024 0:00:01 GMT+02:00 + + ObjectNode result = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + + "/values/timeseries?keys=t&startTs={startTs}&endTs={endTs}&agg={agg}&intervalType={intervalType}&timeZone={timeZone}", + ObjectNode.class, startTs, endTs, "SUM", "WEEK_ISO", "Europe/Kyiv"); + Assert.assertNotNull(result); + Assert.assertNotNull(result.get("t")); + Assert.assertEquals(3, result.get("t").size()); + + var firstIntervalResult = result.get("t").get(0); + Assert.assertEquals(4L, firstIntervalResult.get("value").asLong()); + Assert.assertEquals(firstIntervalTs, firstIntervalResult.get("ts").asLong()); + + var secondIntervalResult = result.get("t").get(1); + Assert.assertEquals(7L, secondIntervalResult.get("value").asLong()); + Assert.assertEquals(secondIntervalTs, secondIntervalResult.get("ts").asLong()); + + var thirdIntervalResult = result.get("t").get(2); + Assert.assertEquals(11L, thirdIntervalResult.get("value").asLong()); + Assert.assertEquals(thirdIntervalTs, thirdIntervalResult.get("ts").asLong()); + + result = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + + "/values/timeseries?keys=t&startTs={startTs}&endTs={endTs}&agg={agg}&intervalType={intervalType}&timeZone={timeZone}", + ObjectNode.class, startTs, endTs, "SUM", "MONTH", "Europe/Kyiv"); + + Assert.assertNotNull(result); + Assert.assertNotNull(result.get("t")); + Assert.assertEquals(1, result.get("t").size()); + + var monthResult = result.get("t").get(0); + Assert.assertEquals(22L, monthResult.get("value").asLong()); + Assert.assertEquals(middleOfTheInterval, monthResult.get("ts").asLong()); + } + @Test public void testDeleteAllTelemetryWithLatest() throws Exception { loginTenantAdmin(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/AggregationParams.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AggregationParams.java new file mode 100644 index 0000000000..6c85cfb3c0 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AggregationParams.java @@ -0,0 +1,92 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.kv; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.StringUtils; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@AllArgsConstructor +@EqualsAndHashCode +@Slf4j +public class AggregationParams { + private static final Map TZ_LINKS = Map.of("EST", "America/New_York", "GMT+0", "GMT", "GMT-0", "GMT", "HST", "US/Hawaii", "MST", "America/Phoenix", "ROC", "Asia/Taipei"); + @Getter + private final Aggregation aggregation; + @Getter + private final IntervalType intervalType; + @Getter + private final ZoneId tzId; + + private final long interval; + + public static AggregationParams none() { + return new AggregationParams(Aggregation.NONE, null, null, 0L); + } + + public static AggregationParams milliseconds(Aggregation aggregationType, long aggregationIntervalMs) { + return new AggregationParams(aggregationType, IntervalType.MILLISECONDS, null, aggregationIntervalMs); + } + + public static AggregationParams calendar(Aggregation aggregationType, IntervalType intervalType, String tzIdStr) { + return calendar(aggregationType, intervalType, getZoneId(tzIdStr)); + } + + public static AggregationParams calendar(Aggregation aggregationType, IntervalType intervalType, ZoneId tzId) { + return new AggregationParams(aggregationType, intervalType, tzId, 0L); + } + + public static AggregationParams of(Aggregation aggregation, IntervalType intervalType, ZoneId tzId, long interval) { + return new AggregationParams(aggregation, intervalType, tzId, interval); + } + + public long getInterval() { + if (intervalType == null) { + return 0L; + } else { + switch (intervalType) { + case WEEK: + case WEEK_ISO: + return TimeUnit.DAYS.toMillis(7); + case MONTH: + return TimeUnit.DAYS.toMillis(30); + case QUARTER: + return TimeUnit.DAYS.toMillis(90); + default: + return interval; + } + } + } + + private static ZoneId getZoneId(String tzIdStr) { + if (StringUtils.isEmpty(tzIdStr)) { + return ZoneId.systemDefault(); + } + try { + return ZoneId.of(tzIdStr, TZ_LINKS); + } catch (DateTimeException e) { + log.warn("[{}] Failed to convert the time zone. Fallback to default.", tzIdStr); + return ZoneId.systemDefault(); + } + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java index 54b3a15bd2..09d52e46a1 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java @@ -18,41 +18,47 @@ package org.thingsboard.server.common.data.kv; import lombok.Data; import lombok.EqualsAndHashCode; +import java.time.ZoneId; + @Data @EqualsAndHashCode(callSuper = true) public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery { - private final long interval; + private final AggregationParams aggParameters; private final int limit; - private final Aggregation aggregation; private final String order; public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) { this(key, startTs, endTs, interval, limit, aggregation, "DESC"); } - public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String order) { + public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String descOrder) { + this(key, startTs, endTs, AggregationParams.of(aggregation, IntervalType.MILLISECONDS, ZoneId.systemDefault(), interval), limit, descOrder); + } + + public BaseReadTsKvQuery(String key, long startTs, long endTs, AggregationParams parameters, int limit) { + this(key, startTs, endTs, parameters, limit, "DESC"); + } + + public BaseReadTsKvQuery(String key, long startTs, long endTs, AggregationParams parameters, int limit, String order) { super(key, startTs, endTs); - this.interval = interval; + this.aggParameters = parameters; this.limit = limit; - this.aggregation = aggregation; this.order = order; } public BaseReadTsKvQuery(String key, long startTs, long endTs) { - this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC"); + this(key, startTs, endTs, AggregationParams.milliseconds(Aggregation.AVG, endTs - startTs), 1, "DESC"); } public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String order) { - this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, order); + this(key, startTs, endTs, AggregationParams.none(), limit, order); } public BaseReadTsKvQuery(ReadTsKvQuery query, long startTs, long endTs) { super(query.getId(), query.getKey(), startTs, endTs); - this.interval = query.getInterval(); + this.aggParameters = query.getAggParameters(); this.limit = query.getLimit(); - this.aggregation = query.getAggregation(); this.order = query.getOrder(); } - } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/IntervalType.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/IntervalType.java index e5ebb7f1a1..121b5993e0 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/IntervalType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/IntervalType.java @@ -17,6 +17,6 @@ package org.thingsboard.server.common.data.kv; public enum IntervalType { - MILLISECONDS, WEEK, WEEK_ISO, MONTH, QUARTER + MILLISECONDS, WEEK/*Sunday-Saturday*/, WEEK_ISO/*Monday-Sunday*/, MONTH, QUARTER } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java index e979ec665a..776179967f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java @@ -17,12 +17,18 @@ package org.thingsboard.server.common.data.kv; public interface ReadTsKvQuery extends TsKvQuery { - long getInterval(); + AggregationParams getAggParameters(); + + default long getInterval(){ + return getAggParameters().getInterval(); + } + + default Aggregation getAggregation() { + return getAggParameters().getAggregation(); + } int getLimit(); - Aggregation getAggregation(); - String getOrder(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 96e87e954e..6c22935b61 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.IntervalType; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -37,9 +38,18 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.sqlts.ts.TsKvRepository; import org.thingsboard.server.dao.timeseries.TimeseriesDao; +import org.thingsboard.server.dao.util.TimeUtils; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.IsoFields; +import java.time.temporal.TemporalUnit; +import java.time.temporal.WeekFields; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -112,16 +122,23 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - if (query.getAggregation() == Aggregation.NONE) { + var aggParams = query.getAggParameters(); + if (Aggregation.NONE.equals(aggParams.getAggregation())) { return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); } else { List>> futures = new ArrayList<>(); + var intervalType = aggParams.getIntervalType(); long startPeriod = query.getStartTs(); long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs()); - long step = query.getInterval(); while (startPeriod < endPeriod) { long startTs = startPeriod; - long endTs = Math.min(startPeriod + step, endPeriod); + long endTs; + if (IntervalType.MILLISECONDS.equals(intervalType)) { + endTs = startPeriod + aggParams.getInterval(); + } else { + endTs = TimeUtils.calculateIntervalEnd(startTs, intervalType, aggParams.getTzId()); + } + endTs = Math.min(endTs, endPeriod); long ts = startTs + (endTs - startTs) / 2; ListenableFuture> aggregateTsKvEntry = findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()); futures.add(aggregateTsKvEntry); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 6f1d569730..bb0ce06026 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; @@ -28,6 +29,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.IntervalType; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -35,11 +37,13 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.timeseries.TimeseriesDao; +import org.thingsboard.server.dao.util.TimeUtils; import org.thingsboard.server.dao.util.TimescaleDBTsDao; import javax.annotation.PostConstruct; @@ -143,14 +147,28 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + var aggParams = query.getAggParameters(); + var intervalType = aggParams.getIntervalType(); if (query.getAggregation() == Aggregation.NONE) { return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); - } else { + } else if (IntervalType.MILLISECONDS.equals(intervalType)) { long startTs = query.getStartTs(); long endTs = Math.max(query.getStartTs() + 1, query.getEndTs()); long timeBucket = query.getInterval(); List> data = findAllAndAggregateAsync(entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation()); return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(data)); + } else { + //TODO: @dshvaika improve according to native capabilities of Timescale. + long startPeriod = query.getStartTs(); + long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs()); + List timescaleTsKvEntities = new ArrayList<>(); + while (startPeriod < endPeriod) { + long startTs = startPeriod; + long endTs = Math.min(TimeUtils.calculateIntervalEnd(startTs, intervalType, aggParams.getTzId()), endPeriod); + timescaleTsKvEntities.addAll(switchAggregation(query.getKey(), startTs, endTs, endTs - startTs, query.getAggregation(), entityId.getId())); + startPeriod = endTs; + } + return getReadTsKvQueryResultFuture(query, Futures.immediateFuture(toResultList(entityId, query.getKey(), timescaleTsKvEntities))); } } @@ -187,6 +205,10 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements timescaleTsKvEntities.addAll(switchAggregation(key, startTs + interval, endTs, remainingPart, aggregation, entityId.getId())); } + return toResultList(entityId, key, timescaleTsKvEntities); + } + + private static List> toResultList(EntityId entityId, String key, List timescaleTsKvEntities) { if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) { List> result = new ArrayList<>(); timescaleTsKvEntities.forEach(entity -> { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 71541962ec..31e29434e8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.IntervalType; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; @@ -51,6 +52,7 @@ import org.thingsboard.server.dao.nosql.TbResultSet; import org.thingsboard.server.dao.nosql.TbResultSetFuture; import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao; import org.thingsboard.server.dao.util.NoSqlTsDao; +import org.thingsboard.server.dao.util.TimeUtils; import javax.annotation.Nullable; import javax.annotation.PostConstruct; @@ -227,18 +229,24 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - if (query.getAggregation() == Aggregation.NONE) { + var aggParams = query.getAggParameters(); + if (Aggregation.NONE.equals(aggParams.getAggregation())) { return findAllAsyncWithLimit(tenantId, entityId, query); } else { long startPeriod = query.getStartTs(); long endPeriod = Math.max(query.getStartTs() + 1, query.getEndTs()); - long step = Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS); List>> futures = new ArrayList<>(); + var intervalType = aggParams.getIntervalType(); while (startPeriod < endPeriod) { long startTs = startPeriod; - long endTs = Math.min(startPeriod + step, endPeriod); - long ts = endTs - startTs; - ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder()); + long endTs; + if (IntervalType.MILLISECONDS.equals(intervalType)) { + endTs = startPeriod + Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS); + } else { + endTs = TimeUtils.calculateIntervalEnd(startTs, aggParams.getIntervalType(), aggParams.getTzId()); + } + endTs = Math.min(endTs, endPeriod); + ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, query.getAggregation(), query.getOrder()); futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); startPeriod = endTs; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java index 9c302be53d..8ab9a2c4da 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java @@ -33,7 +33,7 @@ public class TsKvQueryCursor extends QueryCursor { @Getter private final List data; @Getter - private String orderBy; + private final String orderBy; private int partitionIndex; private int currentLimit; diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/TimeUtils.java b/dao/src/main/java/org/thingsboard/server/dao/util/TimeUtils.java new file mode 100644 index 0000000000..3a768673b8 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/TimeUtils.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import org.thingsboard.server.common.data.kv.IntervalType; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.IsoFields; +import java.time.temporal.WeekFields; + +public class TimeUtils { + + public static long calculateIntervalEnd(long startTs, IntervalType intervalType, ZoneId tzId) { + var startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTs), tzId); + switch (intervalType) { + case WEEK: + return startTime.truncatedTo(ChronoUnit.DAYS).with(WeekFields.SUNDAY_START.dayOfWeek(), 1).plusDays(7).toInstant().toEpochMilli(); + case WEEK_ISO: + return startTime.truncatedTo(ChronoUnit.DAYS).with(WeekFields.ISO.dayOfWeek(), 1).plusDays(7).toInstant().toEpochMilli(); + case MONTH: + return startTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).plusMonths(1).toInstant().toEpochMilli(); + case QUARTER: + return startTime.truncatedTo(ChronoUnit.DAYS).with(IsoFields.DAY_OF_QUARTER, 1).plusMonths(3).toInstant().toEpochMilli(); + default: + throw new RuntimeException("Not supported!"); + } + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/TimeUtilsTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/TimeUtilsTest.java new file mode 100644 index 0000000000..a3df974236 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/util/TimeUtilsTest.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import org.junit.jupiter.api.Test; +import org.thingsboard.server.common.data.kv.IntervalType; + +import java.time.ZoneId; + +import static org.assertj.core.api.Assertions.assertThat; + +class TimeUtilsTest { + + @Test + void testWeekEnd() { + long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705183200000L); // Sunday, January 14, 2024 0:00:00 GMT+02:00 + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705269600000L); // Monday, January 15, 2024 0:00:00 GMT+02:00 + + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1705186800000L); // Sunday, January 14, 2024 0:00:00 GMT+01:00 + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1705273200000L); // Monday, January 15, 2024 0:00:00 GMT+01:00 + + ts = 1704621600000L; // Sunday, January 7, 2024 12:00:00 GMT+02:00 + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK, ZoneId.of("Europe/Kyiv"))).isEqualTo(1705183200000L); // Sunday, January 14, 2024 0:00:00 GMT+02:00 + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.WEEK_ISO, ZoneId.of("Europe/Kyiv"))).isEqualTo(1704664800000L); // Monday, January 8, 2024 0:00:00 GMT+02:00 + } + + + @Test + void testMonthEnd() { + long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.MONTH, ZoneId.of("Europe/Kyiv"))).isEqualTo(1706738400000L); // Thursday, February 1, 2024 0:00:00 GMT+02:00 + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.MONTH, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1706742000000L); // Monday, January 15, 2024 0:00:00 GMT+02:00 + } + + @Test + void testQuarterEnd() { + long ts = 1704899727000L; // Wednesday, January 10 15:15:27 GMT + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Kyiv"))).isEqualTo(1711918800000L); // Monday, April 1, 2024 0:00:00 GMT+03:00 DST + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Amsterdam"))).isEqualTo(1711922400000L); // Monday, April 1, 2024 1:00:00 GMT+03:00 DST + + ts = 1711929600000L; // Monday, April 1, 2024 3:00:00 GMT+03:00 + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("Europe/Kyiv"))).isEqualTo(1719781200000L); // Monday, July 1, 2024 0:00:00 GMT+03:00 DST + assertThat(TimeUtils.calculateIntervalEnd(ts, IntervalType.QUARTER, ZoneId.of("America/New_York"))).isEqualTo(1711944000000L); // Monday, April 1, 2024 7:00:00 GMT+03:00 DST + } + +}