diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index 6490124df9..fbd0cce408 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *
+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *
- * http://www.apache.org/licenses/LICENSE-2.0 - *
+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d2cac80101..42b37db793 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -140,7 +140,7 @@ cassandra: # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" # Specify max data points per request - max_limit_per_request: "${TS_KV_MAX_LIMIT_PER_REQUEST:86400}" + min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:100}" # Actor system parameters actors: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java index f8fad6c874..479a49ac91 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.common.data.kv; /** diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java index 78c887fe85..ed48206340 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *
+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *
- * http://www.apache.org/licenses/LICENSE-2.0 - *
+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 0f8418abbe..d3ed5d1dd1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *
+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *
- * http://www.apache.org/licenses/LICENSE-2.0 - *
+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,6 +19,7 @@ import java.util.UUID; import com.datastax.driver.core.utils.UUIDs; import org.apache.commons.lang3.ArrayUtils; +import org.thingsboard.server.common.data.kv.Aggregation; public class ModelConstants { @@ -261,16 +262,17 @@ public class ModelConstants { public static final String LONG_VALUE_COLUMN = "long_v"; public static final String DOUBLE_VALUE_COLUMN = "dbl_v"; + public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN}; + public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)}; - public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,}; public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)}); public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)}); public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)}); - public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS); + public static final String[] AVG_AGGREGATION_COLUMNS = SUM_AGGREGATION_COLUMNS; public static String min(String s) { return "min(" + s + ")"; @@ -287,4 +289,23 @@ public class ModelConstants { public static String count(String s) { return "count(" + s + ")"; } + + public static String[] getFetchColumnNames(Aggregation aggregation) { + switch (aggregation) { + case NONE: + return NONE_AGGREGATION_COLUMNS; + case MIN: + return MIN_AGGREGATION_COLUMNS; + case MAX: + return MAX_AGGREGATION_COLUMNS; + case SUM: + return SUM_AGGREGATION_COLUMNS; + case COUNT: + return COUNT_AGGREGATION_COLUMNS; + case AVG: + return AVG_AGGREGATION_COLUMNS; + default: + throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!"); + } + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java index 9ee902213b..f099eec004 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.dao.timeseries; import com.datastax.driver.core.ResultSet; @@ -84,8 +99,11 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct count += curCount; } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { count += curCount; - dValue = dValue == null ? curDValue : dValue + curDValue; - lValue = lValue == null ? curLValue : lValue + curLValue; + if (curDValue != null) { + dValue = dValue == null ? curDValue : dValue + curDValue; + } else if (curLValue != null) { + lValue = lValue == null ? curLValue : lValue + curLValue; + } } else if (aggregation == Aggregation.MIN) { if (curDValue != null) { dValue = dValue == null ? curDValue : Math.min(dValue, curDValue); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index 341972902c..5cf71fcce8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *
+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *
- * http://www.apache.org/licenses/LICENSE-2.0 - *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
@@ -51,14 +52,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@Slf4j
public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
- @Value("${cassandra.query.max_limit_per_request}")
- protected Integer maxLimitPerRequest;
-
- @Value("${cassandra.query.read_result_processing_threads}")
- private int readResultsProcessingThreads;
-
- @Value("${cassandra.query.min_read_step}")
- private int minReadStep;
+ @Value("${cassandra.query.min_aggregation_step_ms}")
+ private int minAggregationStepMs;
@Value("${cassandra.query.ts_key_value_partitioning}")
private String partitioning;
@@ -77,7 +72,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
@PostConstruct
public void init() {
getFetchStmt(Aggregation.NONE);
- readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads);
+ readResultsProcessingExecutor = Executors.newCachedThreadPool();
Optional
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,7 +59,7 @@ public class BaseTimeseriesService implements TimeseriesService {
public ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
+ public ListenableFuture
> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
- //TODO:
- return null;
+ return findAllAsyncWithLimit(entityType, entityId, query);
} else {
- long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep);
+ long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minAggregationStepMs);
long stepTs = query.getStartTs();
List
> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) {
+ long minPartition = query.getStartTs();
+ long maxPartition = query.getEndTs();
+
+ ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
+
+ final SimpleListenableFuture
> resultFuture = new SimpleListenableFuture<>();
+ final ListenableFuture
> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+ Futures.addCallback(partitionsListFuture, new FutureCallback
>() {
+ @Override
+ public void onSuccess(@Nullable List
> resultFuture) {
+ if (cursor.isFull() || !cursor.hasNextPartition()) {
+ resultFuture.set(cursor.getData());
+ } else {
+ PreparedStatement proto = getFetchStmt(Aggregation.NONE);
+ BoundStatement stmt = proto.bind();
+ stmt.setString(0, cursor.getEntityType());
+ stmt.setUUID(1, cursor.getEntityId());
+ stmt.setString(2, cursor.getKey());
+ stmt.setLong(3, cursor.getNextPartition());
+ stmt.setLong(4, cursor.getStartTs());
+ stmt.setLong(5, cursor.getEndTs());
+ stmt.setInt(6, cursor.getCurrentLimit());
+
+ Futures.addCallback(executeAsyncRead(stmt), new FutureCallback
> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+ ListenableFuture
> aggregationChunks = Futures.transform(partitionsListFuture,
+ getFetchChunksAsyncFunction(entityType, entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
+
+ return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor);
+ }
+
+ private Function
> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor);
-
- AsyncFunction
, List
, List
> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor);
-
- return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor);
}
@Override
@@ -320,14 +355,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
if (fetchStmts == null) {
fetchStmts = new PreparedStatement[Aggregation.values().length];
for (Aggregation type : Aggregation.values()) {
- fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
- String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
- + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
- + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
- + "AND " + ModelConstants.KEY_COLUMN + " = ? "
- + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
- + "AND " + ModelConstants.TS_COLUMN + " > ? "
- + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+ if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
+ fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
+ } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
+ fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
+ } else {
+ fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
+ String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
+ + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
+ + "AND " + ModelConstants.KEY_COLUMN + " = ? "
+ + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
+ + "AND " + ModelConstants.TS_COLUMN + " > ? "
+ + "AND " + ModelConstants.TS_COLUMN + " <= ?"
+ + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
+ }
}
}
return fetchStmts[aggType.ordinal()];
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index a8b4ef5111..1d8c3dfd4e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- *
> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
validate(entityType, entityId);
validate(query);
- return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs()));
+ return timeseriesDao.findAllAsync(entityType, entityId.getId(), query);
}
@Override
@@ -132,7 +132,8 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("TsKvQuery can't be null");
} else if (isBlank(query.getKey())) {
throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
+ } else if (query.getAggregation() == null){
+ throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
}
- //TODO: add validation of all params
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java
new file mode 100644
index 0000000000..e10a40de5f
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class SimpleListenableFuture
> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition);
+ ListenableFuture
> findAllAsync(String entityType, UUID entityId, TsKvQuery query);
// List