Aggregation

This commit is contained in:
Andrew Shvayka 2017-02-20 19:02:03 +02:00
parent 406438c8f2
commit f6bc0791f1
15 changed files with 498 additions and 180 deletions

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,6 +17,7 @@ package org.thingsboard.server.actors.plugin;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@ -152,7 +153,19 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query) {
validate(deviceId);
return pluginCtx.tsService.find(DataConstants.DEVICE, deviceId, query);
try {
return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get();
} catch (Exception e) {
log.error("TODO", e);
throw new RuntimeException(e);
}
}
@Override
public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback) {
validate(deviceId);
ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query);
Futures.addCallback(future, getCallback(callback, v -> v), executor);
}
@Override
@ -235,10 +248,10 @@ public final class PluginProcessingContext implements PluginContext {
};
}
private <T> FutureCallback<ResultSet> getCallback(final PluginCallback<T> callback, Function<ResultSet, T> transformer) {
return new FutureCallback<ResultSet>() {
private <T, R> FutureCallback<R> getCallback(final PluginCallback<T> callback, Function<R, T> transformer) {
return new FutureCallback<R>() {
@Override
public void onSuccess(@Nullable ResultSet result) {
public void onSuccess(@Nullable R result) {
pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
}

View File

@ -0,0 +1,10 @@
package org.thingsboard.server.common.data.kv;
/**
* Created by ashvayka on 20.02.17.
*/
public enum Aggregation {
MIN, MAX, AVG, SUM, COUNT, NONE;
}

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -15,59 +15,27 @@
*/
package org.thingsboard.server.common.data.kv;
import java.util.Optional;
import lombok.Data;
@Data
public class BaseTsKvQuery implements TsKvQuery {
private String key;
private Optional<Long> startTs;
private Optional<Long> endTs;
private Optional<Integer> limit;
private final String key;
private final long startTs;
private final long endTs;
private final int limit;
private final Aggregation aggregation;
public BaseTsKvQuery(String key, Optional<Long> startTs, Optional<Long> endTs, Optional<Integer> limit) {
public BaseTsKvQuery(String key, long startTs, long endTs, int limit, Aggregation aggregation) {
this.key = key;
this.startTs = startTs;
this.endTs = endTs;
this.limit = limit;
}
public BaseTsKvQuery(String key, Long startTs, Long endTs, Integer limit) {
this(key, Optional.ofNullable(startTs), Optional.ofNullable(endTs), Optional.ofNullable(limit));
this.aggregation = aggregation;
}
public BaseTsKvQuery(String key, Long startTs, Integer limit) {
this(key, startTs, null, limit);
public BaseTsKvQuery(String key, long startTs, long endTs) {
this(key, startTs, endTs, 1, Aggregation.AVG);
}
public BaseTsKvQuery(String key, Long startTs, Long endTs) {
this(key, startTs, endTs, null);
}
public BaseTsKvQuery(String key, Long startTs) {
this(key, startTs, null, null);
}
public BaseTsKvQuery(String key, Integer limit) {
this(key, null, null, limit);
}
@Override
public String getKey() {
return key;
}
@Override
public Optional<Long> getStartTs() {
return startTs;
}
@Override
public Optional<Long> getEndTs() {
return endTs;
}
@Override
public Optional<Integer> getLimit() {
return limit;
}
}

View File

@ -21,10 +21,12 @@ public interface TsKvQuery {
String getKey();
Optional<Long> getStartTs();
long getStartTs();
Optional<Long> getEndTs();
long getEndTs();
Optional<Integer> getLimit();
int getLimit();
Aggregation getAggregation();
}

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,14 +18,15 @@ package org.thingsboard.server.dao.model;
import java.util.UUID;
import com.datastax.driver.core.utils.UUIDs;
import org.apache.commons.lang3.ArrayUtils;
public class ModelConstants {
private ModelConstants() {
}
public static UUID NULL_UUID = UUIDs.startOf(0);
/**
* Generic constants.
*/
@ -38,7 +39,7 @@ public class ModelConstants {
public static final String ALIAS_PROPERTY = "alias";
public static final String SEARCH_TEXT_PROPERTY = "search_text";
public static final String ADDITIONAL_INFO_PROPERTY = "additional_info";
/**
* Cassandra user constants.
*/
@ -50,11 +51,11 @@ public class ModelConstants {
public static final String USER_FIRST_NAME_PROPERTY = "first_name";
public static final String USER_LAST_NAME_PROPERTY = "last_name";
public static final String USER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
public static final String USER_BY_EMAIL_COLUMN_FAMILY_NAME = "user_by_email";
public static final String USER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_tenant_and_search_text";
public static final String USER_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_customer_and_search_text";
/**
* Cassandra user_credentials constants.
*/
@ -64,20 +65,20 @@ public class ModelConstants {
public static final String USER_CREDENTIALS_PASSWORD_PROPERTY = "password";
public static final String USER_CREDENTIALS_ACTIVATE_TOKEN_PROPERTY = "activate_token";
public static final String USER_CREDENTIALS_RESET_TOKEN_PROPERTY = "reset_token";
public static final String USER_CREDENTIALS_BY_USER_COLUMN_FAMILY_NAME = "user_credentials_by_user";
public static final String USER_CREDENTIALS_BY_ACTIVATE_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_activate_token";
public static final String USER_CREDENTIALS_BY_RESET_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_reset_token";
/**
* Cassandra admin_settings constants.
*/
public static final String ADMIN_SETTINGS_COLUMN_FAMILY_NAME = "admin_settings";
public static final String ADMIN_SETTINGS_KEY_PROPERTY = "key";
public static final String ADMIN_SETTINGS_JSON_VALUE_PROPERTY = "json_value";
public static final String ADMIN_SETTINGS_BY_KEY_COLUMN_FAMILY_NAME = "admin_settings_by_key";
/**
* Cassandra contact constants.
*/
@ -97,9 +98,9 @@ public class ModelConstants {
public static final String TENANT_TITLE_PROPERTY = TITLE_PROPERTY;
public static final String TENANT_REGION_PROPERTY = "region";
public static final String TENANT_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
public static final String TENANT_BY_REGION_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "tenant_by_region_and_search_text";
/**
* Cassandra customer constants.
*/
@ -107,9 +108,9 @@ public class ModelConstants {
public static final String CUSTOMER_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY;
public static final String CUSTOMER_TITLE_PROPERTY = TITLE_PROPERTY;
public static final String CUSTOMER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
public static final String CUSTOMER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "customer_by_tenant_and_search_text";
/**
* Cassandra device constants.
*/
@ -118,12 +119,12 @@ public class ModelConstants {
public static final String DEVICE_CUSTOMER_ID_PROPERTY = CUSTOMER_ID_PROPERTY;
public static final String DEVICE_NAME_PROPERTY = "name";
public static final String DEVICE_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
public static final String DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_and_search_text";
public static final String DEVICE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_customer_and_search_text";
public static final String DEVICE_BY_TENANT_AND_NAME_VIEW_NAME = "device_by_tenant_and_name";
/**
* Cassandra device_credentials constants.
*/
@ -132,7 +133,7 @@ public class ModelConstants {
public static final String DEVICE_CREDENTIALS_CREDENTIALS_TYPE_PROPERTY = "credentials_type";
public static final String DEVICE_CREDENTIALS_CREDENTIALS_ID_PROPERTY = "credentials_id";
public static final String DEVICE_CREDENTIALS_CREDENTIALS_VALUE_PROPERTY = "credentials_value";
public static final String DEVICE_CREDENTIALS_BY_DEVICE_COLUMN_FAMILY_NAME = "device_credentials_by_device";
public static final String DEVICE_CREDENTIALS_BY_CREDENTIALS_ID_COLUMN_FAMILY_NAME = "device_credentials_by_credentials_id";
@ -203,9 +204,9 @@ public class ModelConstants {
public static final String COMPONENT_DESCRIPTOR_BY_SCOPE_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "component_desc_by_scope_type_search_text";
public static final String COMPONENT_DESCRIPTOR_BY_ID = "component_desc_by_id";
/**
* Cassandra rule metadata constants.
*/
/**
* Cassandra rule metadata constants.
*/
public static final String RULE_COLUMN_FAMILY_NAME = "rule";
public static final String RULE_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY;
public static final String RULE_NAME_PROPERTY = "name";
@ -259,4 +260,31 @@ public class ModelConstants {
public static final String STRING_VALUE_COLUMN = "str_v";
public static final String LONG_VALUE_COLUMN = "long_v";
public static final String DOUBLE_VALUE_COLUMN = "dbl_v";
public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)};
public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,};
public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)});
public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)});
public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)});
public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS);
public static String min(String s) {
return "min(" + s + ")";
}
public static String max(String s) {
return "max(" + s + ")";
}
public static String sum(String s) {
return "sum(" + s + ")";
}
public static String count(String s) {
return "count(" + s + ")";
}
}

View File

@ -0,0 +1,175 @@
package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import org.thingsboard.server.common.data.kv.*;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
/**
* Created by ashvayka on 20.02.17.
*/
public class AggregatePartitionsFunction implements com.google.common.base.Function<List<ResultSet>, Optional<TsKvEntry>> {
private static final int LONG_CNT_POS = 0;
private static final int DOUBLE_CNT_POS = 1;
private static final int BOOL_CNT_POS = 2;
private static final int STR_CNT_POS = 3;
private static final int LONG_POS = 4;
private static final int DOUBLE_POS = 5;
private static final int BOOL_POS = 6;
private static final int STR_POS = 7;
private final Aggregation aggregation;
private final String key;
private final long ts;
public AggregatePartitionsFunction(Aggregation aggregation, String key, long ts) {
this.aggregation = aggregation;
this.key = key;
this.ts = ts;
}
@Nullable
@Override
public Optional<TsKvEntry> apply(@Nullable List<ResultSet> rsList) {
if (rsList == null || rsList.size() == 0) {
return Optional.empty();
}
long count = 0;
DataType dataType = null;
Boolean bValue = null;
String sValue = null;
Double dValue = null;
Long lValue = null;
for (ResultSet rs : rsList) {
for (Row row : rs.all()) {
long curCount;
Long curLValue = null;
Double curDValue = null;
Boolean curBValue = null;
String curSValue = null;
long longCount = row.getLong(LONG_CNT_POS);
long doubleCount = row.getLong(DOUBLE_CNT_POS);
long boolCount = row.getLong(BOOL_CNT_POS);
long strCount = row.getLong(STR_CNT_POS);
if (longCount > 0) {
dataType = DataType.LONG;
curCount = longCount;
curLValue = getLongValue(row);
} else if (doubleCount > 0) {
dataType = DataType.DOUBLE;
curCount = doubleCount;
curDValue = getDoubleValue(row);
} else if (boolCount > 0) {
dataType = DataType.BOOLEAN;
curCount = boolCount;
curBValue = getBooleanValue(row);
} else if (strCount > 0) {
dataType = DataType.STRING;
curCount = strCount;
curSValue = getStringValue(row);
} else {
continue;
}
if (aggregation == Aggregation.COUNT) {
count += curCount;
} else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
count += curCount;
dValue = dValue == null ? curDValue : dValue + curDValue;
lValue = lValue == null ? curLValue : lValue + curLValue;
} else if (aggregation == Aggregation.MIN) {
if (curDValue != null) {
dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
} else if (curLValue != null) {
lValue = lValue == null ? curLValue : Math.min(lValue, curLValue);
} else if (curBValue != null) {
bValue = bValue == null ? curBValue : bValue && curBValue;
} else if (curSValue != null) {
if (sValue == null || curSValue.compareTo(sValue) < 0) {
sValue = curSValue;
}
}
} else if (aggregation == Aggregation.MAX) {
if (curDValue != null) {
dValue = dValue == null ? curDValue : Math.max(dValue, curDValue);
} else if (curLValue != null) {
lValue = lValue == null ? curLValue : Math.max(lValue, curLValue);
} else if (curBValue != null) {
bValue = bValue == null ? curBValue : bValue || curBValue;
} else if (curSValue != null) {
if (sValue == null || curSValue.compareTo(sValue) > 0) {
sValue = curSValue;
}
}
}
}
}
if (dataType == null) {
return Optional.empty();
} else if (aggregation == Aggregation.COUNT) {
return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count)));
} else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) {
return Optional.empty();
} else if (dataType == DataType.DOUBLE) {
return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count))));
} else if (dataType == DataType.LONG) {
return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count))));
}
} else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
if (dataType == DataType.DOUBLE) {
return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue)));
} else if (dataType == DataType.LONG) {
return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue)));
} else if (dataType == DataType.STRING) {
return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue)));
} else {
return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue)));
}
}
return null;
}
private Boolean getBooleanValue(Row row) {
if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
return row.getBool(BOOL_POS);
} else {
return null;
}
}
private String getStringValue(Row row) {
if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
return row.getString(STR_POS);
} else {
return null;
}
}
private Long getLongValue(Row row) {
if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX
|| aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) {
return row.getLong(LONG_POS);
} else {
return null;
}
}
private Double getDoubleValue(Row row) {
if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX
|| aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) {
return row.getDouble(DOUBLE_POS);
} else {
return null;
}
}
}

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,6 +18,10 @@ package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -26,7 +30,16 @@ import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.dao.AbstractDao;
import org.thingsboard.server.dao.model.ModelConstants;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@ -41,48 +54,136 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
@Value("${cassandra.query.max_limit_per_request}")
protected Integer maxLimitPerRequest;
@Value("${cassandra.query.read_result_processing_threads}")
private int readResultsProcessingThreads;
@Value("${cassandra.query.min_read_step}")
private int minReadStep;
@Value("${cassandra.query.ts_key_value_partitioning}")
private String partitioning;
private TsPartitionDate tsFormat;
private ExecutorService readResultsProcessingExecutor;
private PreparedStatement partitionInsertStmt;
private PreparedStatement[] latestInsertStmts;
private PreparedStatement[] saveStmts;
private PreparedStatement[] fetchStmts;
private PreparedStatement findLatestStmt;
private PreparedStatement findAllLatestStmt;
@Override
public List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition) {
List<Row> rows = Collections.emptyList();
Long[] parts = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
int partsLength = parts.length;
if (parts != null && partsLength > 0) {
int limit = maxLimitPerRequest;
Optional<Integer> lim = query.getLimit();
if (lim.isPresent() && lim.get() < maxLimitPerRequest) {
limit = lim.get();
}
rows = new ArrayList<>(limit);
int lastIdx = partsLength - 1;
for (int i = 0; i < partsLength; i++) {
int currentLimit;
if (rows.size() >= limit) {
break;
} else {
currentLimit = limit - rows.size();
}
Long partition = parts[i];
Select.Where where = select().from(ModelConstants.TS_KV_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
.and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId))
.and(eq(ModelConstants.KEY_COLUMN, query.getKey()))
.and(eq(ModelConstants.PARTITION_COLUMN, partition));
if (i == 0 && query.getStartTs().isPresent()) {
where.and(QueryBuilder.gt(ModelConstants.TS_COLUMN, query.getStartTs().get()));
} else if (i == lastIdx && query.getEndTs().isPresent()) {
where.and(QueryBuilder.lte(ModelConstants.TS_COLUMN, query.getEndTs().get()));
}
where.limit(currentLimit);
rows.addAll(executeRead(where).all());
}
@PostConstruct
public void init() {
getFetchStmt(Aggregation.NONE);
readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads);
Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
if (partition.isPresent()) {
tsFormat = partition.get();
} else {
log.warn("Incorrect configuration of partitioning {}", partitioning);
throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
}
return convertResultToTsKvEntryList(rows);
}
@PreDestroy
public void stop() {
if (readResultsProcessingExecutor != null) {
readResultsProcessingExecutor.shutdownNow();
}
}
@Override
public long toPartitionTs(long ts) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
}
private static String[] getFetchColumnNames(Aggregation aggregation) {
switch (aggregation) {
case NONE:
return ModelConstants.NONE_AGGREGATION_COLUMNS;
case MIN:
return ModelConstants.MIN_AGGREGATION_COLUMNS;
case MAX:
return ModelConstants.MAX_AGGREGATION_COLUMNS;
case SUM:
return ModelConstants.SUM_AGGREGATION_COLUMNS;
case COUNT:
return ModelConstants.COUNT_AGGREGATION_COLUMNS;
case AVG:
return ModelConstants.AVG_AGGREGATION_COLUMNS;
default:
throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!");
}
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
if (query.getAggregation() == Aggregation.NONE) {
//TODO:
return null;
} else {
long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep);
long stepTs = query.getStartTs();
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + step;
TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, 1, query.getAggregation());
futures.add(findAndAggregateAsync(entityType, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
stepTs = endTs;
}
ListenableFuture<List<Optional<TsKvEntry>>> future = Futures.allAsList(futures);
return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
@Nullable
@Override
public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> input) {
return input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList());
}
});
}
}
private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
final Aggregation aggregation = query.getAggregation();
final long startTs = query.getStartTs();
final long endTs = query.getEndTs();
final long ts = startTs + (endTs - startTs) / 2;
ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
com.google.common.base.Function<ResultSet, List<Long>> toArrayFunction = rows -> rows.all().stream()
.map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList());
ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor);
AsyncFunction<List<Long>, List<ResultSet>> fetchChunksFunction = partitions -> {
try {
PreparedStatement proto = getFetchStmt(aggregation);
List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
for (Long partition : partitions) {
BoundStatement stmt = proto.bind();
stmt.setString(0, entityType);
stmt.setUUID(1, entityId);
stmt.setString(2, query.getKey());
stmt.setLong(3, partition);
stmt.setLong(4, startTs);
stmt.setLong(5, endTs);
log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId);
futures.add(executeAsyncRead(stmt));
}
return Futures.allAsList(futures);
} catch (Throwable e) {
log.error("Failed to fetch data", e);
throw e;
}
};
ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor);
return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor);
}
@Override
@ -190,13 +291,12 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
* Select existing partitions from the table
* <code>{@link ModelConstants#TS_KV_PARTITIONS_CF}</code> for the given entity
*/
private Long[] fetchPartitions(String entityType, UUID entityId, String key, Optional<Long> minPartition, Optional<Long> maxPartition) {
private ResultSetFuture fetchPartitions(String entityType, UUID entityId, String key, long minPartition, long maxPartition) {
Select.Where select = QueryBuilder.select(ModelConstants.PARTITION_COLUMN).from(ModelConstants.TS_KV_PARTITIONS_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
.and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)).and(eq(ModelConstants.KEY_COLUMN, key));
minPartition.ifPresent(startTs -> select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition.get())));
maxPartition.ifPresent(endTs -> select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition.get())));
ResultSet resultSet = executeRead(select);
return resultSet.all().stream().map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).toArray(Long[]::new);
select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
return executeAsyncRead(select);
}
private PreparedStatement getSaveStmt(DataType dataType) {
@ -216,6 +316,23 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
return saveStmts[dataType.ordinal()];
}
private PreparedStatement getFetchStmt(Aggregation aggType) {
if (fetchStmts == null) {
fetchStmts = new PreparedStatement[Aggregation.values().length];
for (Aggregation type : Aggregation.values()) {
fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
+ "AND " + ModelConstants.KEY_COLUMN + " = ? "
+ "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
+ "AND " + ModelConstants.TS_COLUMN + " > ? "
+ "AND " + ModelConstants.TS_COLUMN + " <= ?");
}
}
return fetchStmts[aggType.ordinal()];
}
private PreparedStatement getLatestStmt(DataType dataType) {
if (latestInsertStmts == null) {
latestInsertStmts = new PreparedStatement[DataType.values().length];

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -23,21 +23,23 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.service.Validator;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -50,38 +52,14 @@ public class BaseTimeseriesService implements TimeseriesService {
public static final int INSERTS_PER_ENTRY = 3;
@Value("${cassandra.query.ts_key_value_partitioning}")
private String partitioning;
@Autowired
private TimeseriesDao timeseriesDao;
private TsPartitionDate tsFormat;
@PostConstruct
public void init() {
Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
if (partition.isPresent()) {
tsFormat = partition.get();
} else {
log.warn("Incorrect configuration of partitioning {}", partitioning);
throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
}
}
@Override
public List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query) {
public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
validate(entityType, entityId);
validate(query);
return timeseriesDao.find(entityType, entityId.getId(), query, toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()));
}
private Optional<Long> toPartitionTs(Optional<Long> ts) {
if (ts.isPresent()) {
return Optional.of(toPartitionTs(ts.get()));
} else {
return Optional.empty();
}
return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs()));
}
@Override
@ -106,7 +84,7 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("Key value entry can't be null");
}
UUID uid = entityId.getId();
long partitionTs = toPartitionTs(tsKvEntry.getTs());
long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
@ -122,7 +100,7 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("Key value entry can't be null");
}
UUID uid = entityId.getId();
long partitionTs = toPartitionTs(tsKvEntry.getTs());
long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
}
return Futures.allAsList(futures);
@ -144,14 +122,6 @@ public class BaseTimeseriesService implements TimeseriesService {
futures.add(timeseriesDao.save(entityType, uid, partitionTs, tsKvEntry));
}
private long toPartitionTs(long ts) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
LocalDateTime parititonTime = tsFormat.truncatedTo(time);
return parititonTime.toInstant(ZoneOffset.UTC).toEpochMilli();
}
private static void validate(String entityType, UUIDBased entityId) {
Validator.validateString(entityType, "Incorrect entityType " + entityType);
Validator.validateId(entityId, "Incorrect entityId " + entityId);
@ -163,5 +133,6 @@ public class BaseTimeseriesService implements TimeseriesService {
} else if (isBlank(query.getKey())) {
throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
}
//TODO: add validation of all params
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
@ -30,7 +31,11 @@ import java.util.UUID;
*/
public interface TimeseriesDao {
List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
long toPartitionTs(long ts);
ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition);
// List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
ResultSetFuture findLatest(String entityType, UUID entityId, String key);

View File

@ -19,6 +19,7 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
@ -32,8 +33,7 @@ import java.util.Set;
*/
public interface TimeseriesService {
//TODO: Replace this with async operation
List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query);
ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query);
ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);

View File

@ -25,11 +25,11 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClassnameFilters({
"org.thingsboard.server.dao.service.*Test",
"org.thingsboard.server.dao.kv.*Test",
"org.thingsboard.server.dao.plugin.*Test",
"org.thingsboard.server.dao.rule.*Test",
"org.thingsboard.server.dao.attributes.*Test",
// "org.thingsboard.server.dao.service.*Test",
// "org.thingsboard.server.dao.kv.*Test",
// "org.thingsboard.server.dao.plugin.*Test",
// "org.thingsboard.server.dao.rule.*Test",
// "org.thingsboard.server.dao.attributes.*Test",
"org.thingsboard.server.dao.timeseries.*Test"
})
public class DaoTestSuite {

View File

@ -116,14 +116,36 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
entries.add(tsKvEntry);
}
log.debug("Saved all records {}", localDateTime);
List<TsKvEntry> list = tsService.find(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli()));
List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
log.debug("Fetched records {}", localDateTime);
List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
assertEquals(expected.size(), list.size());
assertEquals(expected, list);
}
// @Test
// public void testFindDeviceTsDataByQuery() throws Exception {
// DeviceId deviceId = new DeviceId(UUIDs.timeBased());
// LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
// log.debug("Start event time is {}", localDateTime);
// List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);
//
// for (int i = 0; i < PARTITION_MINUTES; i++) {
// long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
// BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
// tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
// entries.add(tsKvEntry);
// }
// log.debug("Saved all records {}", localDateTime);
// List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
// LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
// log.debug("Fetched records {}", localDateTime);
// List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
// assertEquals(expected.size(), list.size());
// assertEquals(expected, list);
// }
private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get();

View File

@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster
cassandra.keyspace_name=thingsboard
cassandra.url=127.0.0.1:9142
cassandra.url=127.0.0.1:9042
cassandra.ssl=false
@ -47,3 +47,7 @@ cassandra.query.default_fetch_size=2000
cassandra.query.ts_key_value_partitioning=HOURS
cassandra.query.max_limit_per_request=1000
cassandra.query.read_result_processing_threads=3
cassandra.query.min_read_step=100

View File

@ -84,6 +84,8 @@ public interface PluginContext {
List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query);
void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback);
void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback);
void loadLatestTimeseries(DeviceId deviceId, PluginCallback<List<TsKvEntry>> callback);

View File

@ -95,8 +95,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
Optional<Integer> limit = request.getIntParamValue("limit");
Map<String, List<TsData>> data = new LinkedHashMap<>();
for (String key : keys.split(",")) {
List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
//TODO: refactoring
// List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
// data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
}
msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
} else if ("attributes".equals(entity)) {