first stage delete timeseries records, implemented orderBy query

This commit is contained in:
Dima Landiak 2018-05-03 15:57:03 +03:00
parent d03ac971b3
commit 349bf398e2
11 changed files with 220 additions and 38 deletions

View File

@ -26,18 +26,20 @@ public class BaseTsKvQuery implements TsKvQuery {
private final long interval; private final long interval;
private final int limit; private final int limit;
private final Aggregation aggregation; private final Aggregation aggregation;
private final String orderBy;
public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) { public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy) {
this.key = key; this.key = key;
this.startTs = startTs; this.startTs = startTs;
this.endTs = endTs; this.endTs = endTs;
this.interval = interval; this.interval = interval;
this.limit = limit; this.limit = limit;
this.aggregation = aggregation; this.aggregation = aggregation;
this.orderBy = orderBy;
} }
public BaseTsKvQuery(String key, long startTs, long endTs) { public BaseTsKvQuery(String key, long startTs, long endTs) {
this(key, startTs, endTs, endTs-startTs, 1, Aggregation.AVG); this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
} }
} }

View File

@ -29,4 +29,5 @@ public interface TsKvQuery {
Aggregation getAggregation(); Aggregation getAggregation();
String getOrderBy();
} }

View File

@ -299,6 +299,18 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
}); });
} }
@Override
public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
//TODO: implement
return null;
}
@Override
public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
//TODO: implement
return null;
}
@PreDestroy @PreDestroy
void onDestroy() { void onDestroy() {
if (insertService != null) { if (insertService != null) {

View File

@ -40,6 +40,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class BaseTimeseriesService implements TimeseriesService { public class BaseTimeseriesService implements TimeseriesService {
public static final int INSERTS_PER_ENTRY = 3; public static final int INSERTS_PER_ENTRY = 3;
public static final int DELETES_PER_ENTRY = 2;
@Autowired @Autowired
private TimeseriesDao timeseriesDao; private TimeseriesDao timeseriesDao;
@ -95,6 +96,22 @@ public class BaseTimeseriesService implements TimeseriesService {
futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl)); futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl));
} }
@Override
public ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> tsKvQueries) {
validate(entityId);
tsKvQueries.forEach(BaseTimeseriesService::validate);
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvQueries.size() * DELETES_PER_ENTRY);
for (TsKvQuery tsKvQuery : tsKvQueries) {
deleteAndRegisterFutures(futures, entityId, tsKvQuery);
}
return Futures.allAsList(futures);
}
private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
futures.add(timeseriesDao.remove(entityId, query));
futures.add(timeseriesDao.removeLatest(entityId, query));
}
private static void validate(EntityId entityId) { private static void validate(EntityId entityId) {
Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
} }

View File

@ -62,6 +62,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}"; public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}";
public static final String SELECT_PREFIX = "SELECT "; public static final String SELECT_PREFIX = "SELECT ";
public static final String EQUALS_PARAM = " = ? "; public static final String EQUALS_PARAM = " = ? ";
public static final String ASC_ORDER = "ASC";
public static final String DESC_ORDER = "DESC";
@Autowired @Autowired
private Environment environment; private Environment environment;
@ -76,7 +78,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement latestInsertStmt; private PreparedStatement latestInsertStmt;
private PreparedStatement[] saveStmts; private PreparedStatement[] saveStmts;
private PreparedStatement[] saveTtlStmts; private PreparedStatement[] saveTtlStmts;
private PreparedStatement[] fetchStmts; private PreparedStatement[] fetchStmtsAsc;
private PreparedStatement[] fetchStmtsDesc;
private PreparedStatement findLatestStmt; private PreparedStatement findLatestStmt;
private PreparedStatement findAllLatestStmt; private PreparedStatement findAllLatestStmt;
@ -88,7 +91,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public void init() { public void init() {
super.startExecutor(); super.startExecutor();
if (!isInstall()) { if (!isInstall()) {
getFetchStmt(Aggregation.NONE); getFetchStmt(Aggregation.NONE, DESC_ORDER);
Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning); Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
if (partition.isPresent()) { if (partition.isPresent()) {
tsFormat = partition.get(); tsFormat = partition.get();
@ -132,7 +135,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
while (stepTs < query.getEndTs()) { while (stepTs < query.getEndTs()) {
long startTs = stepTs; long startTs = stepTs;
long endTs = stepTs + step; long endTs = stepTs + step;
TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation()); TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
stepTs = endTs; stepTs = endTs;
} }
@ -181,7 +184,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
if (cursor.isFull() || !cursor.hasNextPartition()) { if (cursor.isFull() || !cursor.hasNextPartition()) {
resultFuture.set(cursor.getData()); resultFuture.set(cursor.getData());
} else { } else {
PreparedStatement proto = getFetchStmt(Aggregation.NONE); PreparedStatement proto = getFetchStmt(Aggregation.NONE, cursor.getOrderBy());
BoundStatement stmt = proto.bind(); BoundStatement stmt = proto.bind();
stmt.setString(0, cursor.getEntityType()); stmt.setString(0, cursor.getEntityType());
stmt.setUUID(1, cursor.getEntityId()); stmt.setUUID(1, cursor.getEntityId());
@ -231,7 +234,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
return partitions -> { return partitions -> {
try { try {
PreparedStatement proto = getFetchStmt(aggregation); PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER);
List<ResultSetFuture> futures = new ArrayList<>(partitions.size()); List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
for (Long partition : partitions) { for (Long partition : partitions) {
log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId()); log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId());
@ -318,6 +321,99 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return getFuture(executeAsyncWrite(stmt), rs -> null); return getFuture(executeAsyncWrite(stmt), rs -> null);
} }
@Override
public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs());
ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
@Override
public void onSuccess(@Nullable List<Long> partitions) {
TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
deleteAsync(cursor, resultFuture);
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
}
}, readResultsProcessingExecutor);
return resultFuture;
}
private void deleteAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
if (!cursor.hasNextPartition()) {
resultFuture.set(null);
} else {
PreparedStatement proto = getDeleteStmt();
BoundStatement stmt = proto.bind();
stmt.setString(0, cursor.getEntityType());
stmt.setUUID(1, cursor.getEntityId());
stmt.setString(2, cursor.getKey());
stmt.setLong(3, cursor.getNextPartition());
stmt.setLong(4, cursor.getStartTs());
stmt.setLong(5, cursor.getEndTs());
Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
@Override
public void onSuccess(@Nullable ResultSet result) {
deleteAsync(cursor, resultFuture);
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
}
}, readResultsProcessingExecutor);
}
}
private PreparedStatement getDeleteStmt() {
return prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
" WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " > ? "
+ "AND " + ModelConstants.TS_COLUMN + " <= ?");
}
@Override
public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
ListenableFuture<TsKvEntry> future = findLatest(entityId, query.getKey());
return Futures.transform(future, new Function<TsKvEntry, Void>() {
@Nullable
@Override
public Void apply(@Nullable TsKvEntry latestEntry) {
if (latestEntry != null) {
long ts = latestEntry.getTs();
if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
deleteLatest(entityId, latestEntry.getKey());
//TODO: save new latest entry(< query.getStartTs() - if present) to TS_KV_LATEST_CF
} else {
log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
}
}
return null;
}
});
}
private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
Statement delete = QueryBuilder.delete().from(ModelConstants.TS_KV_LATEST_CF)
.where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
.and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
.and(eq(ModelConstants.KEY_COLUMN, key));
log.debug("Remove request: {}", delete.toString());
return getFuture(executeAsyncWrite(delete), rs -> null);
}
private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) { private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
List<TsKvEntry> entries = new ArrayList<>(rows.size()); List<TsKvEntry> entries = new ArrayList<>(rows.size());
if (!rows.isEmpty()) { if (!rows.isEmpty()) {
@ -413,28 +509,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return saveTtlStmts[dataType.ordinal()]; return saveTtlStmts[dataType.ordinal()];
} }
private PreparedStatement getFetchStmt(Aggregation aggType) { private PreparedStatement getFetchStmt(Aggregation aggType, String orderBy) {
if (fetchStmts == null) { switch (orderBy) {
fetchStmts = new PreparedStatement[Aggregation.values().length]; case ASC_ORDER:
for (Aggregation type : Aggregation.values()) { if (fetchStmtsAsc == null) {
if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) { fetchStmtsAsc = initFetchStmt(orderBy);
fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
} else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
} else {
fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " > ? "
+ "AND " + ModelConstants.TS_COLUMN + " <= ?"
+ (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
} }
return fetchStmtsAsc[aggType.ordinal()];
case DESC_ORDER:
if (fetchStmtsDesc == null) {
fetchStmtsDesc = initFetchStmt(orderBy);
}
return fetchStmtsDesc[aggType.ordinal()];
default:
throw new RuntimeException("Not supported" + orderBy + "order!");
}
}
private PreparedStatement[] initFetchStmt(String orderBy) {
PreparedStatement[] fetchStmts = new PreparedStatement[Aggregation.values().length];
for (Aggregation type : Aggregation.values()) {
if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
} else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
} else {
fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " > ? "
+ "AND " + ModelConstants.TS_COLUMN + " <= ?"
+ (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
} }
} }
return fetchStmts[aggType.ordinal()]; return fetchStmts;
} }
private PreparedStatement getLatestStmt() { private PreparedStatement getLatestStmt() {

View File

@ -38,4 +38,8 @@ public interface TimeseriesDao {
ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl); ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl);
ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry); ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query);
ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query);
} }

View File

@ -37,4 +37,6 @@ public interface TimeseriesService {
ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry); ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl); ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> queries);
} }

View File

@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.DESC_ORDER;
/** /**
* Created by ashvayka on 21.02.17. * Created by ashvayka on 21.02.17.
*/ */
@ -40,6 +42,8 @@ public class TsKvQueryCursor {
private final List<Long> partitions; private final List<Long> partitions;
@Getter @Getter
private final List<TsKvEntry> data; private final List<TsKvEntry> data;
@Getter
private String orderBy;
private int partitionIndex; private int partitionIndex;
private int currentLimit; private int currentLimit;
@ -51,13 +55,14 @@ public class TsKvQueryCursor {
this.startTs = baseQuery.getStartTs(); this.startTs = baseQuery.getStartTs();
this.endTs = baseQuery.getEndTs(); this.endTs = baseQuery.getEndTs();
this.partitions = partitions; this.partitions = partitions;
this.partitionIndex = partitions.size() - 1; this.orderBy = baseQuery.getOrderBy();
this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
this.data = new ArrayList<>(); this.data = new ArrayList<>();
this.currentLimit = baseQuery.getLimit(); this.currentLimit = baseQuery.getLimit();
} }
public boolean hasNextPartition() { public boolean hasNextPartition() {
return partitionIndex >= 0; return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
} }
public boolean isFull() { public boolean isFull() {
@ -66,7 +71,11 @@ public class TsKvQueryCursor {
public long getNextPartition() { public long getNextPartition() {
long partition = partitions.get(partitionIndex); long partition = partitions.get(partitionIndex);
partitionIndex--; if (isDesc()) {
partitionIndex--;
} else {
partitionIndex++;
}
return partition; return partition;
} }
@ -78,4 +87,8 @@ public class TsKvQueryCursor {
currentLimit -= newData.size(); currentLimit -= newData.size();
data.addAll(newData); data.addAll(newData);
} }
private boolean isDesc() {
return orderBy.equals(DESC_ORDER);
}
} }

View File

@ -45,6 +45,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
private static final String BOOLEAN_KEY = "booleanKey"; private static final String BOOLEAN_KEY = "booleanKey";
private static final long TS = 42L; private static final long TS = 42L;
private static final String DESC_ORDER = "DESC";
KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value"); KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE); KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
@ -92,6 +93,24 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0));
} }
@Test
public void testDeleteDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
saveEntries(deviceId, TS - 3);
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
saveEntries(deviceId, TS);
tsService.remove(deviceId, Collections.singletonList(
new BaseTsKvQuery(STRING_KEY, TS - 4, TS - 2))).get();
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER))).get();
Assert.assertEquals(2, list.size());
}
@Test @Test
public void testFindDeviceTsData() throws Exception { public void testFindDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased()); DeviceId deviceId = new DeviceId(UUIDs.timeBased());
@ -107,7 +126,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
entries.add(save(deviceId, 55000, 600)); entries.add(save(deviceId, 55000, 600));
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.NONE))).get(); 60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get();
assertEquals(3, list.size()); assertEquals(3, list.size());
assertEquals(55000, list.get(0).getTs()); assertEquals(55000, list.get(0).getTs());
assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue()); assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
@ -119,7 +138,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue()); assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.AVG))).get(); 60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get();
assertEquals(3, list.size()); assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs()); assertEquals(10000, list.get(0).getTs());
assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue()); assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
@ -131,7 +150,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue()); assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.SUM))).get(); 60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get();
assertEquals(3, list.size()); assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs()); assertEquals(10000, list.get(0).getTs());
@ -144,7 +163,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue()); assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.MIN))).get(); 60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get();
assertEquals(3, list.size()); assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs()); assertEquals(10000, list.get(0).getTs());
@ -157,7 +176,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue()); assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.MAX))).get(); 60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get();
assertEquals(3, list.size()); assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs()); assertEquals(10000, list.get(0).getTs());
@ -170,7 +189,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue()); assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.COUNT))).get(); 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get();
assertEquals(3, list.size()); assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs()); assertEquals(10000, list.get(0).getTs());

View File

@ -140,7 +140,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) :
Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)) List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC"))
.collect(Collectors.toList()); .collect(Collectors.toList());
ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg)); ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg));
} else { } else {

View File

@ -54,6 +54,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
public static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!"; public static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
public static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!"; public static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
public static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!"; public static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
public static final String ORDER_BY = "DESC";
private final SubscriptionManager subscriptionManager; private final SubscriptionManager subscriptionManager;
@ -216,7 +217,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId); log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
startTs = cmd.getStartTs(); startTs = cmd.getStartTs();
long endTs = cmd.getStartTs() + cmd.getTimeWindow(); long endTs = cmd.getStartTs() + cmd.getTimeWindow();
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)).collect(Collectors.toList());
ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys)); ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys));
} else { } else {
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
@ -300,7 +301,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
} }
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))) List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY))
.collect(Collectors.toList()); .collect(Collectors.toList());
ctx.loadTimeseries(entityId, queries, new PluginCallback<List<TsKvEntry>>() { ctx.loadTimeseries(entityId, queries, new PluginCallback<List<TsKvEntry>>() {
@Override @Override