From 87d05f7c845d2f64e6fe93607e3a946d9146497f Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 21 Feb 2017 12:26:15 +0200 Subject: [PATCH] Implementation --- .../plugin/PluginProcessingContext.java | 8 +- .../src/main/resources/thingsboard.yml | 2 +- .../server/common/data/kv/Aggregation.java | 15 ++ .../server/common/data/kv/BaseTsKvQuery.java | 8 +- .../server/dao/model/ModelConstants.java | 33 +++- .../AggregatePartitionsFunction.java | 22 ++- .../dao/timeseries/BaseTimeseriesDao.java | 154 +++++++++++------- .../dao/timeseries/BaseTimeseriesService.java | 13 +- .../timeseries/SimpleListenableFuture.java | 33 ++++ .../server/dao/timeseries/TimeseriesDao.java | 2 +- .../dao/timeseries/TsKvQueryCursor.java | 82 ++++++++++ .../thingsboard/server/dao/DaoTestSuite.java | 10 +- .../dao/timeseries/TimeseriesServiceTest.java | 132 ++++++++++----- .../test/resources/cassandra-test.properties | 4 +- 14 files changed, 389 insertions(+), 129 deletions(-) create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index 6490124df9..fbd0cce408 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d2cac80101..42b37db793 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -140,7 +140,7 @@ cassandra: # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" # Specify max data points per request - max_limit_per_request: "${TS_KV_MAX_LIMIT_PER_REQUEST:86400}" + min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:100}" # Actor system parameters actors: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java index f8fad6c874..479a49ac91 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.common.data.kv; /** diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java index 78c887fe85..ed48206340 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 0f8418abbe..d3ed5d1dd1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,6 +19,7 @@ import java.util.UUID; import com.datastax.driver.core.utils.UUIDs; import org.apache.commons.lang3.ArrayUtils; +import org.thingsboard.server.common.data.kv.Aggregation; public class ModelConstants { @@ -261,16 +262,17 @@ public class ModelConstants { public static final String LONG_VALUE_COLUMN = "long_v"; public static final String DOUBLE_VALUE_COLUMN = "dbl_v"; + public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN}; + public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)}; - public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,}; public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)}); public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)}); public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)}); - public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS); + public static final String[] AVG_AGGREGATION_COLUMNS = SUM_AGGREGATION_COLUMNS; public static String min(String s) { return "min(" + s + ")"; @@ -287,4 +289,23 @@ public class ModelConstants { public static String count(String s) { return "count(" + s + ")"; } + + public static String[] getFetchColumnNames(Aggregation aggregation) { + switch (aggregation) { + case NONE: + return NONE_AGGREGATION_COLUMNS; + case MIN: + return MIN_AGGREGATION_COLUMNS; + case MAX: + return MAX_AGGREGATION_COLUMNS; + case SUM: + return SUM_AGGREGATION_COLUMNS; + case COUNT: + return COUNT_AGGREGATION_COLUMNS; + case AVG: + return AVG_AGGREGATION_COLUMNS; + default: + throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!"); + } + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java index 9ee902213b..f099eec004 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.dao.timeseries; import com.datastax.driver.core.ResultSet; @@ -84,8 +99,11 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct count += curCount; } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { count += curCount; - dValue = dValue == null ? curDValue : dValue + curDValue; - lValue = lValue == null ? curLValue : lValue + curLValue; + if (curDValue != null) { + dValue = dValue == null ? curDValue : dValue + curDValue; + } else if (curLValue != null) { + lValue = lValue == null ? curLValue : lValue + curLValue; + } } else if (aggregation == Aggregation.MIN) { if (curDValue != null) { dValue = dValue == null ? curDValue : Math.min(dValue, curDValue); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index 341972902c..5cf71fcce8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.google.common.base.Function; import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -51,14 +52,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select; @Slf4j public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { - @Value("${cassandra.query.max_limit_per_request}") - protected Integer maxLimitPerRequest; - - @Value("${cassandra.query.read_result_processing_threads}") - private int readResultsProcessingThreads; - - @Value("${cassandra.query.min_read_step}") - private int minReadStep; + @Value("${cassandra.query.min_aggregation_step_ms}") + private int minAggregationStepMs; @Value("${cassandra.query.ts_key_value_partitioning}") private String partitioning; @@ -77,7 +72,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { @PostConstruct public void init() { getFetchStmt(Aggregation.NONE); - readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads); + readResultsProcessingExecutor = Executors.newCachedThreadPool(); Optional partition = TsPartitionDate.parse(partitioning); if (partition.isPresent()) { tsFormat = partition.get(); @@ -100,33 +95,12 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli(); } - - private static String[] getFetchColumnNames(Aggregation aggregation) { - switch (aggregation) { - case NONE: - return ModelConstants.NONE_AGGREGATION_COLUMNS; - case MIN: - return ModelConstants.MIN_AGGREGATION_COLUMNS; - case MAX: - return ModelConstants.MAX_AGGREGATION_COLUMNS; - case SUM: - return ModelConstants.SUM_AGGREGATION_COLUMNS; - case COUNT: - return ModelConstants.COUNT_AGGREGATION_COLUMNS; - case AVG: - return ModelConstants.AVG_AGGREGATION_COLUMNS; - default: - throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!"); - } - } - @Override - public ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) { + public ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { - //TODO: - return null; + return findAllAsyncWithLimit(entityType, entityId, query); } else { - long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep); + long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minAggregationStepMs); long stepTs = query.getStartTs(); List>> futures = new ArrayList<>(); while (stepTs < query.getEndTs()) { @@ -143,23 +117,88 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { public List apply(@Nullable List> input) { return input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList()); } - }); + }, readResultsProcessingExecutor); + } + } + + private ListenableFuture> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) { + long minPartition = query.getStartTs(); + long maxPartition = query.getEndTs(); + + ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition); + + final SimpleListenableFuture> resultFuture = new SimpleListenableFuture<>(); + final ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); + + Futures.addCallback(partitionsListFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List partitions) { + TsKvQueryCursor cursor = new TsKvQueryCursor(entityType, entityId, query, partitions); + findAllAsyncSequentiallyWithLimit(cursor, resultFuture); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityType, entityId, minPartition, maxPartition, t); + } + }, readResultsProcessingExecutor); + + return resultFuture; + } + + private void findAllAsyncSequentiallyWithLimit(final TsKvQueryCursor cursor, final SimpleListenableFuture> resultFuture) { + if (cursor.isFull() || !cursor.hasNextPartition()) { + resultFuture.set(cursor.getData()); + } else { + PreparedStatement proto = getFetchStmt(Aggregation.NONE); + BoundStatement stmt = proto.bind(); + stmt.setString(0, cursor.getEntityType()); + stmt.setUUID(1, cursor.getEntityId()); + stmt.setString(2, cursor.getKey()); + stmt.setLong(3, cursor.getNextPartition()); + stmt.setLong(4, cursor.getStartTs()); + stmt.setLong(5, cursor.getEndTs()); + stmt.setInt(6, cursor.getCurrentLimit()); + + Futures.addCallback(executeAsyncRead(stmt), new FutureCallback() { + @Override + public void onSuccess(@Nullable ResultSet result) { + cursor.addData(convertResultToTsKvEntryList(result.all())); + findAllAsyncSequentiallyWithLimit(cursor, resultFuture); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to fetch data for query {}-{}", stmt, t); + } + }, readResultsProcessingExecutor); } } private ListenableFuture> findAndAggregateAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) { final Aggregation aggregation = query.getAggregation(); + final String key = query.getKey(); final long startTs = query.getStartTs(); final long endTs = query.getEndTs(); final long ts = startTs + (endTs - startTs) / 2; - ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition); - com.google.common.base.Function> toArrayFunction = rows -> rows.all().stream() + ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, key, minPartition, maxPartition); + + ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); + + ListenableFuture> aggregationChunks = Futures.transform(partitionsListFuture, + getFetchChunksAsyncFunction(entityType, entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor); + + return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor); + } + + private Function> getPartitionsArrayFunction() { + return rows -> rows.all().stream() .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList()); + } - ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor); - - AsyncFunction, List> fetchChunksFunction = partitions -> { + private AsyncFunction, List> getFetchChunksAsyncFunction(String entityType, UUID entityId, String key, Aggregation aggregation, long startTs, long endTs) { + return partitions -> { try { PreparedStatement proto = getFetchStmt(aggregation); List futures = new ArrayList<>(partitions.size()); @@ -167,7 +206,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { BoundStatement stmt = proto.bind(); stmt.setString(0, entityType); stmt.setUUID(1, entityId); - stmt.setString(2, query.getKey()); + stmt.setString(2, key); stmt.setLong(3, partition); stmt.setLong(4, startTs); stmt.setLong(5, endTs); @@ -180,10 +219,6 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { throw e; } }; - - ListenableFuture> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor); - - return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor); } @Override @@ -320,14 +355,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { if (fetchStmts == null) { fetchStmts = new PreparedStatement[Aggregation.values().length]; for (Aggregation type : Aggregation.values()) { - fetchStmts[type.ordinal()] = getSession().prepare("SELECT " + - String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF - + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " - + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? " - + "AND " + ModelConstants.KEY_COLUMN + " = ? " - + "AND " + ModelConstants.PARTITION_COLUMN + " = ? " - + "AND " + ModelConstants.TS_COLUMN + " > ? " - + "AND " + ModelConstants.TS_COLUMN + " <= ?"); + if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) { + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()]; + } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; + } else { + fetchStmts[type.ordinal()] = getSession().prepare("SELECT " + + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " + + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? " + + "AND " + ModelConstants.KEY_COLUMN + " = ? " + + "AND " + ModelConstants.PARTITION_COLUMN + " = ? " + + "AND " + ModelConstants.TS_COLUMN + " > ? " + + "AND " + ModelConstants.TS_COLUMN + " <= ?" + + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : "")); + } } } return fetchStmts[aggType.ordinal()]; diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index a8b4ef5111..1d8c3dfd4e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -59,7 +59,7 @@ public class BaseTimeseriesService implements TimeseriesService { public ListenableFuture> findAll(String entityType, UUIDBased entityId, TsKvQuery query) { validate(entityType, entityId); validate(query); - return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs())); + return timeseriesDao.findAllAsync(entityType, entityId.getId(), query); } @Override @@ -132,7 +132,8 @@ public class BaseTimeseriesService implements TimeseriesService { throw new IncorrectParameterException("TsKvQuery can't be null"); } else if (isBlank(query.getKey())) { throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty"); + } else if (query.getAggregation() == null){ + throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty"); } - //TODO: add validation of all params } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java new file mode 100644 index 0000000000..e10a40de5f --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import com.google.common.util.concurrent.AbstractFuture; + +/** + * Created by ashvayka on 21.02.17. + */ +public class SimpleListenableFuture extends AbstractFuture { + + public SimpleListenableFuture() { + + } + + public boolean set(V value) { + return super.set(value); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 83b78da646..7a6eed7eb0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -33,7 +33,7 @@ public interface TimeseriesDao { long toPartitionTs(long ts); - ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition); + ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query); // List find(String entityType, UUID entityId, TsKvQuery query, Optional minPartition, Optional maxPartition); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java new file mode 100644 index 0000000000..cad1232d68 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java @@ -0,0 +1,82 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.Data; +import lombok.Getter; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvQuery; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Created by ashvayka on 21.02.17. + */ +public class TsKvQueryCursor { + @Getter + private final String entityType; + @Getter + private final UUID entityId; + @Getter + private final String key; + @Getter + private final long startTs; + @Getter + private final long endTs; + private final List partitions; + @Getter + private final List data; + + private int partitionIndex; + private int currentLimit; + + public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List partitions) { + this.entityType = entityType; + this.entityId = entityId; + this.key = baseQuery.getKey(); + this.startTs = baseQuery.getStartTs(); + this.endTs = baseQuery.getEndTs(); + this.partitions = partitions; + this.partitionIndex = partitions.size() - 1; + this.data = new ArrayList<>(); + this.currentLimit = baseQuery.getLimit(); + } + + public boolean hasNextPartition() { + return partitionIndex >= 0; + } + + public boolean isFull() { + return currentLimit <= 0; + } + + public long getNextPartition() { + long partition = partitions.get(partitionIndex); + partitionIndex--; + return partition; + } + + public int getCurrentLimit() { + return currentLimit; + } + + public void addData(List newData) { + currentLimit -= newData.size(); + data.addAll(newData); + } +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java index ac48536793..b1502595cb 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java +++ b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java @@ -25,11 +25,11 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClassnameFilters({ -// "org.thingsboard.server.dao.service.*Test", -// "org.thingsboard.server.dao.kv.*Test", -// "org.thingsboard.server.dao.plugin.*Test", -// "org.thingsboard.server.dao.rule.*Test", -// "org.thingsboard.server.dao.attributes.*Test", + "org.thingsboard.server.dao.service.*Test", + "org.thingsboard.server.dao.kv.*Test", + "org.thingsboard.server.dao.plugin.*Test", + "org.thingsboard.server.dao.rule.*Test", + "org.thingsboard.server.dao.attributes.*Test", "org.thingsboard.server.dao.timeseries.*Test" }) public class DaoTestSuite { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java index 51fce6fbb0..13c25c3213 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java @@ -51,8 +51,6 @@ public class TimeseriesServiceTest extends AbstractServiceTest { private static final String DOUBLE_KEY = "doubleKey"; private static final String BOOLEAN_KEY = "booleanKey"; - public static final int PARTITION_MINUTES = 1100; - private static final long TS = 42L; KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value"); @@ -103,49 +101,101 @@ public class TimeseriesServiceTest extends AbstractServiceTest { } @Test - public void testFindDeviceTsDataByQuery() throws Exception { + public void testFindDeviceTsData() throws Exception { DeviceId deviceId = new DeviceId(UUIDs.timeBased()); - LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES); - log.debug("Start event time is {}", localDateTime); - List entries = new ArrayList<>(PARTITION_MINUTES); + List entries = new ArrayList<>(); - for (int i = 0; i < PARTITION_MINUTES; i++) { - long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli(); - BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry); - tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get(); - entries.add(tsKvEntry); - } - log.debug("Saved all records {}", localDateTime); - List list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(), - LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get(); - log.debug("Fetched records {}", localDateTime); - List expected = entries.subList(600, PARTITION_MINUTES); - assertEquals(expected.size(), list.size()); - assertEquals(expected, list); + entries.add(save(deviceId, 5000, 100)); + entries.add(save(deviceId, 15000, 200)); + + entries.add(save(deviceId, 25000, 300)); + entries.add(save(deviceId, 35000, 400)); + + entries.add(save(deviceId, 45000, 500)); + entries.add(save(deviceId, 55000, 600)); + + List list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.NONE)).get(); + assertEquals(3, list.size()); + assertEquals(55000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue()); + + assertEquals(45000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(500L), list.get(1).getLongValue()); + + assertEquals(35000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue()); + + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.AVG)).get(); + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(350L), list.get(1).getLongValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue()); + + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.SUM)).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(300L), list.get(0).getLongValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(700L), list.get(1).getLongValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue()); + + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.MIN)).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(100L), list.get(0).getLongValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(300L), list.get(1).getLongValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue()); + + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.MAX)).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(200L), list.get(0).getLongValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(400L), list.get(1).getLongValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue()); + + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0, + 60000, 3, Aggregation.COUNT)).get(); + + assertEquals(3, list.size()); + assertEquals(10000, list.get(0).getTs()); + assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue()); + + assertEquals(30000, list.get(1).getTs()); + assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue()); + + assertEquals(50000, list.get(2).getTs()); + assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue()); } -// @Test -// public void testFindDeviceTsDataByQuery() throws Exception { -// DeviceId deviceId = new DeviceId(UUIDs.timeBased()); -// LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES); -// log.debug("Start event time is {}", localDateTime); -// List entries = new ArrayList<>(PARTITION_MINUTES); -// -// for (int i = 0; i < PARTITION_MINUTES; i++) { -// long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli(); -// BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry); -// tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get(); -// entries.add(tsKvEntry); -// } -// log.debug("Saved all records {}", localDateTime); -// List list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(), -// LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get(); -// log.debug("Fetched records {}", localDateTime); -// List expected = entries.subList(600, PARTITION_MINUTES); -// assertEquals(expected.size(), list.size()); -// assertEquals(expected, list); -// } - + private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception { + TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value)); + tsService.save(DataConstants.DEVICE, deviceId, entry).get(); + return entry; + } private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException { tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get(); diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 0a207e7068..210d2c0c9e 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -48,6 +48,4 @@ cassandra.query.ts_key_value_partitioning=HOURS cassandra.query.max_limit_per_request=1000 -cassandra.query.read_result_processing_threads=3 - -cassandra.query.min_read_step=100 \ No newline at end of file +cassandra.query.min_aggregation_step_ms=100 \ No newline at end of file