diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 7599f94539..2b30b7422b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; -import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; @@ -30,8 +29,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.web.socket.CloseStatus; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; @@ -52,6 +50,9 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import org.thingsboard.server.service.telemetry.cmd.v2.AggHistoryCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.AggKey; +import org.thingsboard.server.service.telemetry.cmd.v2.AggTimeSeriesCmd; import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd; @@ -69,7 +70,6 @@ import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -133,6 +133,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private int maxEntitiesPerAlarmSubscription; @Value("${server.ws.dynamic_page_link.max_alarm_queries_per_refresh_interval:10}") private int maxAlarmQueriesPerRefreshInterval; + @Value("${ui.dashboard.max_datapoints_limit:50000}") + private int maxDatapointLimit; private ExecutorService wsCallBackExecutor; private boolean tsInSqlDB; @@ -167,7 +169,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); if (ctx != null) { log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); - if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) { + if (cmd.hasAnyCmd()) { ctx.clearEntitySubscriptions(); } } else { @@ -206,6 +208,18 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc finalCtx.setRefreshTask(task); } } + if (cmd.getAggHistoryCmd() != null) { + handleAggHistoryCmd(session, ctx, cmd.getAggHistoryCmd()); + } else if (cmd.getAggTsCmd() != null) { + handleAggTsCmd(session, ctx, cmd.getAggTsCmd()); + } else if (cmd.hasRegularCmds()) { + handleRegularCommands(session, ctx, cmd); + } else { + checkAndSendInitialData(ctx); + } + } + + private void handleRegularCommands(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, EntityDataCmd cmd) { ListenableFuture historyFuture; if (cmd.getHistoryCmd() != null) { log.trace("[{}][{}] Going to process history command: {}", session.getSessionId(), cmd.getCmdId(), cmd.getHistoryCmd()); @@ -229,10 +243,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc if (cmd.getTsCmd() != null) { handleTimeSeriesCmd(theCtx, cmd.getTsCmd()); } - } else if (!theCtx.isInitialDataSent()) { - EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription()); - theCtx.sendWsMsg(update); - theCtx.setInitialDataSent(true); + } else { + checkAndSendInitialData(theCtx); } } catch (RuntimeException e) { handleWsCmdRuntimeException(theCtx.getSessionId(), e, cmd); @@ -246,6 +258,94 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc }, wsCallBackExecutor); } + private void checkAndSendInitialData(@Nullable TbEntityDataSubCtx theCtx) { + if (!theCtx.isInitialDataSent()) { + EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription()); + theCtx.sendWsMsg(update); + theCtx.setInitialDataSent(true); + } + } + + private void handleAggHistoryCmd(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, AggHistoryCmd cmd) { + var keys = cmd.getKeys(); + long interval = cmd.getEndTs() - cmd.getStartTs(); + List queries = keys.stream().map(key -> new BaseReadTsKvQuery( + key.getKey(), cmd.getStartTs(), cmd.getEndTs(), interval, 1, key.getAgg() + )).distinct().collect(Collectors.toList()); + handleAggCmd(session, ctx, cmd.getKeys(), queries, cmd.getStartTs(), cmd.getEndTs(), false, false); + } + + private void handleAggTsCmd(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, AggTimeSeriesCmd cmd) { + long endTs = cmd.getStartTs() + cmd.getTimeWindow(); + List queries = cmd.getKeys().stream().map(key -> { + if (cmd.isFloating()) { + return new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), endTs, cmd.getTimeWindow(), getLimit(maxDatapointLimit), Aggregation.NONE); + } else { + return new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), endTs, cmd.getTimeWindow(), 1, key.getAgg()); + } + }).distinct().collect(Collectors.toList()); + handleAggCmd(session, ctx, cmd.getKeys(), queries, cmd.getStartTs(), endTs, cmd.isFloating(), true); + } + + private void handleAggCmd(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, List keys, List queries, + long startTs, long endTs, boolean floating, boolean subscribe) { + Map>> fetchResultMap = new HashMap<>(); + List entityDataList = ctx.getData().getData(); + entityDataList.forEach(entityData -> fetchResultMap.put(entityData, + tsService.findAllByQueries(ctx.getTenantId(), entityData.getEntityId(), queries))); + Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> { + // Map that holds last ts for each key for each entity. + Map> lastTsEntityMap = new HashMap<>(); + fetchResultMap.forEach((entityData, future) -> { + try { + Map lastTsMap = new HashMap<>(); + lastTsEntityMap.put(entityData, lastTsMap); + + List queryResults = future.get(); + if (queryResults != null) { + for (ReadTsKvQueryResult queryResult : queryResults) { + if (floating) { + entityData.getAggFloating().put(queryResult.getKey(), queryResult.toTsValues()); + } else { + entityData.getAggLatest().computeIfAbsent(queryResult.getAgg(), agg -> new HashMap<>()).put(queryResult.getKey(), queryResult.toTsValue()); + } + lastTsMap.put(queryResult.getKey(), queryResult.getLastEntryTs()); + } + } + // Populate with empty values if no data found. + keys.forEach(key -> { + if (floating) { + entityData.getAggFloating().putIfAbsent(key.getKey(), new TsValue[]{TsValue.EMPTY}); + } else { + entityData.getAggLatest().computeIfAbsent(key.getAgg(), agg -> new HashMap<>()).putIfAbsent(key.getKey(), TsValue.EMPTY); + } + }); + } catch (InterruptedException | ExecutionException e) { + log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e); + ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!")); + } + }); + ctx.getWsLock().lock(); + try { + EntityDataUpdate update; + if (!ctx.isInitialDataSent()) { + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); + ctx.setInitialDataSent(true); + } else { + update = new EntityDataUpdate(ctx.getCmdId(), null, entityDataList, ctx.getMaxEntitiesPerDataSubscription()); + } + if (subscribe) { + ctx.createTimeSeriesSubscriptions(lastTsEntityMap, startTs, endTs, true); + } + ctx.sendWsMsg(update); + entityDataList.forEach(ed -> ed.getTimeseries().clear()); + } finally { + ctx.getWsLock().unlock(); + } + return ctx; + }, wsCallBackExecutor); + } + private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) { log.debug("[{}] Failed to process ws cmd: {}", sessionId, cmd, e); wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED); @@ -420,12 +520,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private ListenableFuture handleGetTsCmd(TbEntityDataSubCtx ctx, GetTsCmd cmd, boolean subscribe) { List keys = cmd.getKeys(); List finalTsKvQueryList; - List tsKvQueryList = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery( + List tsKvQueryList = keys.stream().map(key -> new BaseReadTsKvQuery( key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg() )).collect(Collectors.toList()); if (cmd.isFetchLatestPreviousPoint()) { finalTsKvQueryList = new ArrayList<>(tsKvQueryList); - finalTsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery( + finalTsKvQueryList.addAll(keys.stream().map(key -> new BaseReadTsKvQuery( key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg() )).collect(Collectors.toList())); } else { @@ -451,7 +551,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } } // Populate with empty values if no data found. - cmd.getKeys().forEach(key -> { + keys.forEach(key -> { if (!entityData.getTimeseries().containsKey(key)) { entityData.getTimeseries().put(key, new TsValue[0]); } @@ -475,7 +575,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc update = new EntityDataUpdate(ctx.getCmdId(), null, entityDataList, ctx.getMaxEntitiesPerDataSubscription()); } if (subscribe) { - ctx.createTimeseriesSubscriptions(lastTsEntityMap, cmd.getStartTs(), cmd.getEndTs()); + ctx.createTimeSeriesSubscriptions(lastTsEntityMap, cmd.getStartTs(), cmd.getEndTs()); } ctx.sendWsMsg(update); entityDataList.forEach(ed -> ed.getTimeseries().clear()); @@ -546,11 +646,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc ctx.getWsLock().lock(); try { ctx.createLatestValuesSubscriptions(latestCmd.getKeys()); - if (!ctx.isInitialDataSent()) { - EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription()); - ctx.sendWsMsg(update); - ctx.setInitialDataSent(true); - } + checkAndSendInitialData(ctx); } finally { ctx.getWsLock().unlock(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index 0238921783..26e97b6f2c 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -122,12 +122,16 @@ public abstract class TbAbstractDataSubCtx> entityKeyStates, long startTs, long endTs) { + public void createTimeSeriesSubscriptions(Map> entityKeyStates, long startTs, long endTs) { + createTimeSeriesSubscriptions(entityKeyStates, startTs, endTs, false); + } + + public void createTimeSeriesSubscriptions(Map> entityKeyStates, long startTs, long endTs, boolean resultToLatestValues) { entityKeyStates.forEach((entityData, keyStates) -> { int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); subToEntityIdMap.put(subIdx, entityData.getEntityId()); localSubscriptionService.addSubscription( - createTsSub(entityData, subIdx, false, startTs, endTs, keyStates)); + createTsSub(entityData, subIdx, false, startTs, endTs, keyStates, resultToLatestValues)); }); } @@ -200,6 +204,10 @@ public abstract class TbAbstractDataSubCtx keyStates) { + return createTsSub(entityData, subIdx, latestValues, startTs, endTs, keyStates, latestValues); + } + + private TbTimeseriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map keyStates, boolean resultToLatestValues) { log.trace("[{}][{}][{}] Creating time-series subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates); return TbTimeseriesSubscription.builder() .serviceId(serviceId) @@ -207,7 +215,7 @@ public abstract class TbAbstractDataSubCtx sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, latestValues)) + .updateConsumer((sessionId, subscriptionUpdate) -> sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues)) .allKeys(false) .keyStates(keyStates) .latestValues(latestValues) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 888e2c2742..d9beb1ed33 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -716,7 +716,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi "Cmd id is negative value!"); sendWsMsg(sessionRef, update); return false; - } else if (cmd.getQuery() == null && cmd.getLatestCmd() == null && cmd.getHistoryCmd() == null && cmd.getTsCmd() == null) { + } else if (cmd.getQuery() == null && !cmd.hasAnyCmd()) { TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Query is empty!"); sendWsMsg(sessionRef, update); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityDataCmd.java b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityDataCmd.java index bdd69c12aa..e18b1a745b 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityDataCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityDataCmd.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.telemetry.cmd.v2; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import org.thingsboard.server.common.data.query.EntityDataQuery; @@ -55,4 +56,22 @@ public class EntityDataCmd extends DataCmd { this.aggHistoryCmd = aggHistoryCmd; this.aggTsCmd = aggTsCmd; } + + @JsonIgnore + public boolean hasAnyCmd() { + return historyCmd != null || latestCmd != null || tsCmd != null || aggHistoryCmd != null || aggTsCmd != null; + } + + @JsonIgnore + public boolean hasRegularCmds() { + return historyCmd != null || latestCmd != null || tsCmd != null; + } + + @JsonIgnore + public boolean hasAggCmds() { + return aggHistoryCmd != null || aggTsCmd != null; + } + + + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQueryResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQueryResult.java index dd1b50bb40..26a8a0c2c5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQueryResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQueryResult.java @@ -25,12 +25,13 @@ import java.util.List; public class ReadTsKvQueryResult { private final String key; + // Holds the aggregation from the query + private final Aggregation agg; // Holds the data list; private final List data; // Holds the max ts of the records that match aggregation intervals (not the ts of the aggregation window, but the ts of the last record among all the intervals) private final long lastEntryTs; - public TsValue[] toTsValues() { if (data != null && !data.isEmpty()) { List queryValues = new ArrayList<>(); @@ -43,4 +44,14 @@ public class ReadTsKvQueryResult { } } + public TsValue toTsValue() { + if (data == null || data.isEmpty()) { + return TsValue.EMPTY; + } + if (data.size() > 1) { + throw new RuntimeException("Query Result has multiple data points!"); + } + return data.get(0).toTsValue(); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityData.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityData.java index c0cc778280..26bbc94290 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityData.java @@ -15,16 +15,25 @@ */ package org.thingsboard.server.common.data.query; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.RequiredArgsConstructor; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.Aggregation; import java.util.Map; @Data +@RequiredArgsConstructor public class EntityData { private final EntityId entityId; private final Map> latest; private final Map timeseries; + private final Map> aggLatest; + private final Map aggFloating; + public EntityData(EntityId entityId, Map> latest, Map timeseries) { + this(entityId, latest, timeseries, null, null); + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/TsValue.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/TsValue.java index b7e83521e8..9dbfdfbb28 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/TsValue.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/TsValue.java @@ -24,6 +24,8 @@ import lombok.RequiredArgsConstructor; @JsonInclude(JsonInclude.Include.NON_NULL) public class TsValue { + public static final TsValue EMPTY = new TsValue(0, ""); + private final long ts; private final String value; private final Long count; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityDataAdapter.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityDataAdapter.java index c30ecdd9a2..561be612c7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityDataAdapter.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityDataAdapter.java @@ -54,8 +54,8 @@ public class EntityDataAdapter { EntityType entityType = EntityType.valueOf((String) row.get("entity_type")); EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, id); Map> latest = new HashMap<>(); - Map timeseries = new HashMap<>(); - EntityData entityData = new EntityData(entityId, latest, timeseries); + //Maybe avoid empty hashmaps? + EntityData entityData = new EntityData(entityId, latest, new HashMap<>(), new HashMap<>(), new HashMap<>()); for (EntityKeyMapping mapping : selectionMapping) { if (!mapping.isIgnore()) { EntityKey entityKey = mapping.getEntityKey(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 365d1240b3..b5905dcf7d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -149,7 +149,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey())); List tsKvEntries = DaoUtil.convertDataList(tsKvEntities); long lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs()); - return new ReadTsKvQueryResult(query.getKey(), tsKvEntries, lastTs); + return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs); } Optional findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java index 314117fe06..27bb3307be 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java @@ -99,7 +99,7 @@ public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeni if (lastTs.isEmpty()) { lastTs = data.stream().map(AbstractTsKvEntity::getTs).filter(Objects::nonNull).max(Long::compare); } - return new ReadTsKvQueryResult(query.getKey(), DaoUtil.convertDataList(data), lastTs.orElse(query.getStartTs())); + return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), DaoUtil.convertDataList(data), lastTs.orElse(query.getStartTs())); } }, service); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index bfefbf4bd8..9f2763fde9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -181,7 +181,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements timescaleTsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(strKey)); var tsKvEntries = DaoUtil.convertDataList(timescaleTsKvEntities); long lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs()); - return new ReadTsKvQueryResult(query.getKey(), tsKvEntries, lastTs); + return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs); } private List> findAllAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 14a49027f7..01450bbb55 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -285,7 +285,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Override public ReadTsKvQueryResult apply(@Nullable List> input) { if (input == null) { - return new ReadTsKvQueryResult(query.getKey(), Collections.emptyList(), query.getStartTs()); + return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), Collections.emptyList(), query.getStartTs()); } else { long maxTs = query.getStartTs(); List data = new ArrayList<>(); @@ -296,7 +296,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD data.add(tsKvEntryAggWrapper.getEntry()); } } - return new ReadTsKvQueryResult(query.getKey(), data, maxTs); + return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), data, maxTs); } } @@ -333,7 +333,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD if (tsKvEntries != null) { lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs()); } - return new ReadTsKvQueryResult(query.getKey(), tsKvEntries, lastTs); + return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs); }, MoreExecutors.directExecutor()); }