diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java index 0afe00b4c3..51d4ad2004 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java @@ -26,18 +26,20 @@ public class BaseTsKvQuery implements TsKvQuery { private final long interval; private final int limit; private final Aggregation aggregation; + private final String orderBy; - public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) { + public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy) { this.key = key; this.startTs = startTs; this.endTs = endTs; this.interval = interval; this.limit = limit; this.aggregation = aggregation; + this.orderBy = orderBy; } public BaseTsKvQuery(String key, long startTs, long endTs) { - this(key, startTs, endTs, endTs-startTs, 1, Aggregation.AVG); + this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC"); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java index ca9f90c4e8..9b907c3440 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java @@ -29,4 +29,5 @@ public interface TsKvQuery { Aggregation getAggregation(); + String getOrderBy(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index 6350352961..5503f49262 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -299,6 +299,18 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp }); } + @Override + public ListenableFuture remove(EntityId entityId, TsKvQuery query) { + //TODO: implement + return null; + } + + @Override + public ListenableFuture removeLatest(EntityId entityId, TsKvQuery query) { + //TODO: implement + return null; + } + @PreDestroy void onDestroy() { if (insertService != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index c981378939..a075885308 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -40,6 +40,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; public class BaseTimeseriesService implements TimeseriesService { public static final int INSERTS_PER_ENTRY = 3; + public static final int DELETES_PER_ENTRY = 2; @Autowired private TimeseriesDao timeseriesDao; @@ -95,6 +96,22 @@ public class BaseTimeseriesService implements TimeseriesService { futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl)); } + @Override + public ListenableFuture> remove(EntityId entityId, List tsKvQueries) { + validate(entityId); + tsKvQueries.forEach(BaseTimeseriesService::validate); + List> futures = Lists.newArrayListWithExpectedSize(tsKvQueries.size() * DELETES_PER_ENTRY); + for (TsKvQuery tsKvQuery : tsKvQueries) { + deleteAndRegisterFutures(futures, entityId, tsKvQuery); + } + return Futures.allAsList(futures); + } + + private void deleteAndRegisterFutures(List> futures, EntityId entityId, TsKvQuery query) { + futures.add(timeseriesDao.remove(entityId, query)); + futures.add(timeseriesDao.removeLatest(entityId, query)); + } + private static void validate(EntityId entityId) { Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index cda4b1669b..7895b59f20 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -62,6 +62,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}"; public static final String SELECT_PREFIX = "SELECT "; public static final String EQUALS_PARAM = " = ? "; + public static final String ASC_ORDER = "ASC"; + public static final String DESC_ORDER = "DESC"; @Autowired private Environment environment; @@ -76,7 +78,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement latestInsertStmt; private PreparedStatement[] saveStmts; private PreparedStatement[] saveTtlStmts; - private PreparedStatement[] fetchStmts; + private PreparedStatement[] fetchStmtsAsc; + private PreparedStatement[] fetchStmtsDesc; private PreparedStatement findLatestStmt; private PreparedStatement findAllLatestStmt; @@ -88,7 +91,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public void init() { super.startExecutor(); if (!isInstall()) { - getFetchStmt(Aggregation.NONE); + getFetchStmt(Aggregation.NONE, DESC_ORDER); Optional partition = TsPartitionDate.parse(partitioning); if (partition.isPresent()) { tsFormat = partition.get(); @@ -132,7 +135,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem while (stepTs < query.getEndTs()) { long startTs = stepTs; long endTs = stepTs + step; - TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation()); + TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy()); futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); stepTs = endTs; } @@ -181,7 +184,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (cursor.isFull() || !cursor.hasNextPartition()) { resultFuture.set(cursor.getData()); } else { - PreparedStatement proto = getFetchStmt(Aggregation.NONE); + PreparedStatement proto = getFetchStmt(Aggregation.NONE, cursor.getOrderBy()); BoundStatement stmt = proto.bind(); stmt.setString(0, cursor.getEntityType()); stmt.setUUID(1, cursor.getEntityId()); @@ -231,7 +234,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private AsyncFunction, List> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { - PreparedStatement proto = getFetchStmt(aggregation); + PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER); List futures = new ArrayList<>(partitions.size()); for (Long partition : partitions) { log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId()); @@ -318,6 +321,99 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return getFuture(executeAsyncWrite(stmt), rs -> null); } + @Override + public ListenableFuture remove(EntityId entityId, TsKvQuery query) { + long minPartition = toPartitionTs(query.getStartTs()); + long maxPartition = toPartitionTs(query.getEndTs()); + + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); + + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + final ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); + + Futures.addCallback(partitionsListFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List partitions) { + TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions); + deleteAsync(cursor, resultFuture); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); + } + }, readResultsProcessingExecutor); + return resultFuture; + } + + private void deleteAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture resultFuture) { + if (!cursor.hasNextPartition()) { + resultFuture.set(null); + } else { + PreparedStatement proto = getDeleteStmt(); + BoundStatement stmt = proto.bind(); + stmt.setString(0, cursor.getEntityType()); + stmt.setUUID(1, cursor.getEntityId()); + stmt.setString(2, cursor.getKey()); + stmt.setLong(3, cursor.getNextPartition()); + stmt.setLong(4, cursor.getStartTs()); + stmt.setLong(5, cursor.getEndTs()); + + Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback() { + @Override + public void onSuccess(@Nullable ResultSet result) { + deleteAsync(cursor, resultFuture); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t); + } + }, readResultsProcessingExecutor); + } + } + + private PreparedStatement getDeleteStmt() { + return prepare("DELETE FROM " + ModelConstants.TS_KV_CF + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.TS_COLUMN + " > ? " + + "AND " + ModelConstants.TS_COLUMN + " <= ?"); + } + + @Override + public ListenableFuture removeLatest(EntityId entityId, TsKvQuery query) { + ListenableFuture future = findLatest(entityId, query.getKey()); + return Futures.transform(future, new Function() { + @Nullable + @Override + public Void apply(@Nullable TsKvEntry latestEntry) { + if (latestEntry != null) { + long ts = latestEntry.getTs(); + if (ts >= query.getStartTs() && ts <= query.getEndTs()) { + deleteLatest(entityId, latestEntry.getKey()); + + //TODO: save new latest entry(< query.getStartTs() - if present) to TS_KV_LATEST_CF + } else { + log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey()); + } + } + return null; + } + }); + } + + private ListenableFuture deleteLatest(EntityId entityId, String key) { + Statement delete = QueryBuilder.delete().from(ModelConstants.TS_KV_LATEST_CF) + .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType())) + .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId())) + .and(eq(ModelConstants.KEY_COLUMN, key)); + log.debug("Remove request: {}", delete.toString()); + return getFuture(executeAsyncWrite(delete), rs -> null); + } + private List convertResultToTsKvEntryList(List rows) { List entries = new ArrayList<>(rows.size()); if (!rows.isEmpty()) { @@ -413,28 +509,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return saveTtlStmts[dataType.ordinal()]; } - private PreparedStatement getFetchStmt(Aggregation aggType) { - if (fetchStmts == null) { - fetchStmts = new PreparedStatement[Aggregation.values().length]; - for (Aggregation type : Aggregation.values()) { - if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) { - fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()]; - } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { - fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; - } else { - fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX + - String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF - + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.TS_COLUMN + " > ? " - + "AND " + ModelConstants.TS_COLUMN + " <= ?" - + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : "")); + private PreparedStatement getFetchStmt(Aggregation aggType, String orderBy) { + switch (orderBy) { + case ASC_ORDER: + if (fetchStmtsAsc == null) { + fetchStmtsAsc = initFetchStmt(orderBy); } + return fetchStmtsAsc[aggType.ordinal()]; + case DESC_ORDER: + if (fetchStmtsDesc == null) { + fetchStmtsDesc = initFetchStmt(orderBy); + } + return fetchStmtsDesc[aggType.ordinal()]; + default: + throw new RuntimeException("Not supported" + orderBy + "order!"); + } + } + + private PreparedStatement[] initFetchStmt(String orderBy) { + PreparedStatement[] fetchStmts = new PreparedStatement[Aggregation.values().length]; + for (Aggregation type : Aggregation.values()) { + if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) { + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()]; + } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; + } else { + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX + + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.TS_COLUMN + " > ? " + + "AND " + ModelConstants.TS_COLUMN + " <= ?" + + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : "")); } } - return fetchStmts[aggType.ordinal()]; + return fetchStmts; } private PreparedStatement getLatestStmt() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 1e3f4cecb7..22bb166585 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -38,4 +38,8 @@ public interface TimeseriesDao { ListenableFuture savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl); ListenableFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry); + + ListenableFuture remove(EntityId entityId, TsKvQuery query); + + ListenableFuture removeLatest(EntityId entityId, TsKvQuery query); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 2cd2d8dab9..a14919185d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -37,4 +37,6 @@ public interface TimeseriesService { ListenableFuture> save(EntityId entityId, TsKvEntry tsKvEntry); ListenableFuture> save(EntityId entityId, List tsKvEntry, long ttl); + + ListenableFuture> remove(EntityId entityId, List queries); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java index d6b6bbd5c0..c4925ee9be 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.DESC_ORDER; + /** * Created by ashvayka on 21.02.17. */ @@ -40,6 +42,8 @@ public class TsKvQueryCursor { private final List partitions; @Getter private final List data; + @Getter + private String orderBy; private int partitionIndex; private int currentLimit; @@ -51,13 +55,14 @@ public class TsKvQueryCursor { this.startTs = baseQuery.getStartTs(); this.endTs = baseQuery.getEndTs(); this.partitions = partitions; - this.partitionIndex = partitions.size() - 1; + this.orderBy = baseQuery.getOrderBy(); + this.partitionIndex = isDesc() ? partitions.size() - 1 : 0; this.data = new ArrayList<>(); this.currentLimit = baseQuery.getLimit(); } public boolean hasNextPartition() { - return partitionIndex >= 0; + return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1; } public boolean isFull() { @@ -66,7 +71,11 @@ public class TsKvQueryCursor { public long getNextPartition() { long partition = partitions.get(partitionIndex); - partitionIndex--; + if (isDesc()) { + partitionIndex--; + } else { + partitionIndex++; + } return partition; } @@ -78,4 +87,8 @@ public class TsKvQueryCursor { currentLimit -= newData.size(); data.addAll(newData); } + + private boolean isDesc() { + return orderBy.equals(DESC_ORDER); + } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 0cb3f7fbdd..2130ab97d0 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -45,6 +45,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { private static final String BOOLEAN_KEY = "booleanKey"; private static final long TS = 42L; + private static final String DESC_ORDER = "DESC"; KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value"); KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE); @@ -92,6 +93,24 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); } + @Test + public void testDeleteDeviceTsData() throws Exception { + DeviceId deviceId = new DeviceId(UUIDs.timeBased()); + + saveEntries(deviceId, TS - 3); + saveEntries(deviceId, TS - 2); + saveEntries(deviceId, TS - 1); + saveEntries(deviceId, TS); + + tsService.remove(deviceId, Collections.singletonList( + new BaseTsKvQuery(STRING_KEY, TS - 4, TS - 2))).get(); + + List list = tsService.findAll(deviceId, Collections.singletonList( + new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER))).get(); + + Assert.assertEquals(2, list.size()); + } + @Test public void testFindDeviceTsData() throws Exception { DeviceId deviceId = new DeviceId(UUIDs.timeBased()); @@ -107,7 +126,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { entries.add(save(deviceId, 55000, 600)); List list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.NONE))).get(); + 60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get(); assertEquals(3, list.size()); assertEquals(55000, list.get(0).getTs()); assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue()); @@ -119,7 +138,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.AVG))).get(); + 60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue()); @@ -131,7 +150,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.SUM))).get(); + 60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -144,7 +163,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.MIN))).get(); + 60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -157,7 +176,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.MAX))).get(); + 60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -170,7 +189,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.COUNT))).get(); + 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index 3ea754ae9c..0c7e3874ee 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -140,7 +140,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)) + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC")) .collect(Collectors.toList()); ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg)); } else { diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index 1374ef68ac..bf75c5de29 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -54,6 +54,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { public static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!"; public static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!"; public static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!"; + public static final String ORDER_BY = "DESC"; private final SubscriptionManager subscriptionManager; @@ -216,7 +217,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId); startTs = cmd.getStartTs(); long endTs = cmd.getStartTs() + cmd.getTimeWindow(); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)).collect(Collectors.toList()); ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys)); } else { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); @@ -300,7 +301,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { } EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))) + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)) .collect(Collectors.toList()); ctx.loadTimeseries(entityId, queries, new PluginCallback>() { @Override