From f6bc0791f10dc20260ea04dec20429c93ed18840 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Mon, 20 Feb 2017 19:02:03 +0200 Subject: [PATCH] Aggregation --- .../plugin/PluginProcessingContext.java | 29 ++- .../server/common/data/kv/Aggregation.java | 10 + .../server/common/data/kv/BaseTsKvQuery.java | 62 ++---- .../server/common/data/kv/TsKvQuery.java | 8 +- .../server/dao/model/ModelConstants.java | 74 +++++-- .../AggregatePartitionsFunction.java | 175 +++++++++++++++ .../dao/timeseries/BaseTimeseriesDao.java | 205 ++++++++++++++---- .../dao/timeseries/BaseTimeseriesService.java | 55 ++--- .../server/dao/timeseries/TimeseriesDao.java | 7 +- .../dao/timeseries/TimeseriesService.java | 4 +- .../thingsboard/server/dao/DaoTestSuite.java | 10 +- .../dao/timeseries/TimeseriesServiceTest.java | 26 ++- .../test/resources/cassandra-test.properties | 6 +- .../extensions/api/plugins/PluginContext.java | 2 + .../handlers/TelemetryRestMsgHandler.java | 5 +- 15 files changed, 498 insertions(+), 180 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index 92e0ec4101..6490124df9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,7 @@ package org.thingsboard.server.actors.plugin; import java.io.IOException; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -152,7 +153,19 @@ public final class PluginProcessingContext implements PluginContext { @Override public List loadTimeseries(DeviceId deviceId, TsKvQuery query) { validate(deviceId); - return pluginCtx.tsService.find(DataConstants.DEVICE, deviceId, query); + try { + return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get(); + } catch (Exception e) { + log.error("TODO", e); + throw new RuntimeException(e); + } + } + + @Override + public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback> callback) { + validate(deviceId); + ListenableFuture> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query); + Futures.addCallback(future, getCallback(callback, v -> v), executor); } @Override @@ -235,10 +248,10 @@ public final class PluginProcessingContext implements PluginContext { }; } - private FutureCallback getCallback(final PluginCallback callback, Function transformer) { - return new FutureCallback() { + private FutureCallback getCallback(final PluginCallback callback, Function transformer) { + return new FutureCallback() { @Override - public void onSuccess(@Nullable ResultSet result) { + public void onSuccess(@Nullable R result) { pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender()); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java new file mode 100644 index 0000000000..f8fad6c874 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java @@ -0,0 +1,10 @@ +package org.thingsboard.server.common.data.kv; + +/** + * Created by ashvayka on 20.02.17. + */ +public enum Aggregation { + + MIN, MAX, AVG, SUM, COUNT, NONE; + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java index 7bb1a3fca5..78c887fe85 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,59 +15,27 @@ */ package org.thingsboard.server.common.data.kv; -import java.util.Optional; +import lombok.Data; +@Data public class BaseTsKvQuery implements TsKvQuery { - private String key; - private Optional startTs; - private Optional endTs; - private Optional limit; + private final String key; + private final long startTs; + private final long endTs; + private final int limit; + private final Aggregation aggregation; - public BaseTsKvQuery(String key, Optional startTs, Optional endTs, Optional limit) { + public BaseTsKvQuery(String key, long startTs, long endTs, int limit, Aggregation aggregation) { this.key = key; this.startTs = startTs; this.endTs = endTs; this.limit = limit; - } - - public BaseTsKvQuery(String key, Long startTs, Long endTs, Integer limit) { - this(key, Optional.ofNullable(startTs), Optional.ofNullable(endTs), Optional.ofNullable(limit)); + this.aggregation = aggregation; } - public BaseTsKvQuery(String key, Long startTs, Integer limit) { - this(key, startTs, null, limit); + public BaseTsKvQuery(String key, long startTs, long endTs) { + this(key, startTs, endTs, 1, Aggregation.AVG); } - public BaseTsKvQuery(String key, Long startTs, Long endTs) { - this(key, startTs, endTs, null); - } - - public BaseTsKvQuery(String key, Long startTs) { - this(key, startTs, null, null); - } - - public BaseTsKvQuery(String key, Integer limit) { - this(key, null, null, limit); - } - - @Override - public String getKey() { - return key; - } - - @Override - public Optional getStartTs() { - return startTs; - } - - @Override - public Optional getEndTs() { - return endTs; - } - - @Override - public Optional getLimit() { - return limit; - } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java index 1303117d63..10a13ce797 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java @@ -21,10 +21,12 @@ public interface TsKvQuery { String getKey(); - Optional getStartTs(); + long getStartTs(); - Optional getEndTs(); + long getEndTs(); - Optional getLimit(); + int getLimit(); + + Aggregation getAggregation(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index b68fb75b5a..0f8418abbe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,14 +18,15 @@ package org.thingsboard.server.dao.model; import java.util.UUID; import com.datastax.driver.core.utils.UUIDs; +import org.apache.commons.lang3.ArrayUtils; public class ModelConstants { private ModelConstants() { } - + public static UUID NULL_UUID = UUIDs.startOf(0); - + /** * Generic constants. */ @@ -38,7 +39,7 @@ public class ModelConstants { public static final String ALIAS_PROPERTY = "alias"; public static final String SEARCH_TEXT_PROPERTY = "search_text"; public static final String ADDITIONAL_INFO_PROPERTY = "additional_info"; - + /** * Cassandra user constants. */ @@ -50,11 +51,11 @@ public class ModelConstants { public static final String USER_FIRST_NAME_PROPERTY = "first_name"; public static final String USER_LAST_NAME_PROPERTY = "last_name"; public static final String USER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY; - + public static final String USER_BY_EMAIL_COLUMN_FAMILY_NAME = "user_by_email"; public static final String USER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_tenant_and_search_text"; public static final String USER_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_customer_and_search_text"; - + /** * Cassandra user_credentials constants. */ @@ -64,20 +65,20 @@ public class ModelConstants { public static final String USER_CREDENTIALS_PASSWORD_PROPERTY = "password"; public static final String USER_CREDENTIALS_ACTIVATE_TOKEN_PROPERTY = "activate_token"; public static final String USER_CREDENTIALS_RESET_TOKEN_PROPERTY = "reset_token"; - + public static final String USER_CREDENTIALS_BY_USER_COLUMN_FAMILY_NAME = "user_credentials_by_user"; public static final String USER_CREDENTIALS_BY_ACTIVATE_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_activate_token"; public static final String USER_CREDENTIALS_BY_RESET_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_reset_token"; - + /** * Cassandra admin_settings constants. */ public static final String ADMIN_SETTINGS_COLUMN_FAMILY_NAME = "admin_settings"; public static final String ADMIN_SETTINGS_KEY_PROPERTY = "key"; public static final String ADMIN_SETTINGS_JSON_VALUE_PROPERTY = "json_value"; - + public static final String ADMIN_SETTINGS_BY_KEY_COLUMN_FAMILY_NAME = "admin_settings_by_key"; - + /** * Cassandra contact constants. */ @@ -97,9 +98,9 @@ public class ModelConstants { public static final String TENANT_TITLE_PROPERTY = TITLE_PROPERTY; public static final String TENANT_REGION_PROPERTY = "region"; public static final String TENANT_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY; - + public static final String TENANT_BY_REGION_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "tenant_by_region_and_search_text"; - + /** * Cassandra customer constants. */ @@ -107,9 +108,9 @@ public class ModelConstants { public static final String CUSTOMER_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY; public static final String CUSTOMER_TITLE_PROPERTY = TITLE_PROPERTY; public static final String CUSTOMER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY; - + public static final String CUSTOMER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "customer_by_tenant_and_search_text"; - + /** * Cassandra device constants. */ @@ -118,12 +119,12 @@ public class ModelConstants { public static final String DEVICE_CUSTOMER_ID_PROPERTY = CUSTOMER_ID_PROPERTY; public static final String DEVICE_NAME_PROPERTY = "name"; public static final String DEVICE_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY; - + public static final String DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_and_search_text"; public static final String DEVICE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_customer_and_search_text"; public static final String DEVICE_BY_TENANT_AND_NAME_VIEW_NAME = "device_by_tenant_and_name"; - + /** * Cassandra device_credentials constants. */ @@ -132,7 +133,7 @@ public class ModelConstants { public static final String DEVICE_CREDENTIALS_CREDENTIALS_TYPE_PROPERTY = "credentials_type"; public static final String DEVICE_CREDENTIALS_CREDENTIALS_ID_PROPERTY = "credentials_id"; public static final String DEVICE_CREDENTIALS_CREDENTIALS_VALUE_PROPERTY = "credentials_value"; - + public static final String DEVICE_CREDENTIALS_BY_DEVICE_COLUMN_FAMILY_NAME = "device_credentials_by_device"; public static final String DEVICE_CREDENTIALS_BY_CREDENTIALS_ID_COLUMN_FAMILY_NAME = "device_credentials_by_credentials_id"; @@ -203,9 +204,9 @@ public class ModelConstants { public static final String COMPONENT_DESCRIPTOR_BY_SCOPE_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "component_desc_by_scope_type_search_text"; public static final String COMPONENT_DESCRIPTOR_BY_ID = "component_desc_by_id"; - /** - * Cassandra rule metadata constants. - */ + /** + * Cassandra rule metadata constants. + */ public static final String RULE_COLUMN_FAMILY_NAME = "rule"; public static final String RULE_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY; public static final String RULE_NAME_PROPERTY = "name"; @@ -259,4 +260,31 @@ public class ModelConstants { public static final String STRING_VALUE_COLUMN = "str_v"; public static final String LONG_VALUE_COLUMN = "long_v"; public static final String DOUBLE_VALUE_COLUMN = "dbl_v"; + + public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)}; + + public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,}; + public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, + new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)}); + public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, + new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)}); + public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, + new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)}); + public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS); + + public static String min(String s) { + return "min(" + s + ")"; + } + + public static String max(String s) { + return "max(" + s + ")"; + } + + public static String sum(String s) { + return "sum(" + s + ")"; + } + + public static String count(String s) { + return "count(" + s + ")"; + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java new file mode 100644 index 0000000000..9ee902213b --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java @@ -0,0 +1,175 @@ +package org.thingsboard.server.dao.timeseries; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import org.thingsboard.server.common.data.kv.*; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Optional; + +/** + * Created by ashvayka on 20.02.17. + */ +public class AggregatePartitionsFunction implements com.google.common.base.Function, Optional> { + + private static final int LONG_CNT_POS = 0; + private static final int DOUBLE_CNT_POS = 1; + private static final int BOOL_CNT_POS = 2; + private static final int STR_CNT_POS = 3; + private static final int LONG_POS = 4; + private static final int DOUBLE_POS = 5; + private static final int BOOL_POS = 6; + private static final int STR_POS = 7; + + private final Aggregation aggregation; + private final String key; + private final long ts; + + public AggregatePartitionsFunction(Aggregation aggregation, String key, long ts) { + this.aggregation = aggregation; + this.key = key; + this.ts = ts; + } + + @Nullable + @Override + public Optional apply(@Nullable List rsList) { + if (rsList == null || rsList.size() == 0) { + return Optional.empty(); + } + long count = 0; + DataType dataType = null; + + Boolean bValue = null; + String sValue = null; + Double dValue = null; + Long lValue = null; + + for (ResultSet rs : rsList) { + for (Row row : rs.all()) { + long curCount; + + Long curLValue = null; + Double curDValue = null; + Boolean curBValue = null; + String curSValue = null; + + long longCount = row.getLong(LONG_CNT_POS); + long doubleCount = row.getLong(DOUBLE_CNT_POS); + long boolCount = row.getLong(BOOL_CNT_POS); + long strCount = row.getLong(STR_CNT_POS); + + if (longCount > 0) { + dataType = DataType.LONG; + curCount = longCount; + curLValue = getLongValue(row); + } else if (doubleCount > 0) { + dataType = DataType.DOUBLE; + curCount = doubleCount; + curDValue = getDoubleValue(row); + } else if (boolCount > 0) { + dataType = DataType.BOOLEAN; + curCount = boolCount; + curBValue = getBooleanValue(row); + } else if (strCount > 0) { + dataType = DataType.STRING; + curCount = strCount; + curSValue = getStringValue(row); + } else { + continue; + } + + if (aggregation == Aggregation.COUNT) { + count += curCount; + } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { + count += curCount; + dValue = dValue == null ? curDValue : dValue + curDValue; + lValue = lValue == null ? curLValue : lValue + curLValue; + } else if (aggregation == Aggregation.MIN) { + if (curDValue != null) { + dValue = dValue == null ? curDValue : Math.min(dValue, curDValue); + } else if (curLValue != null) { + lValue = lValue == null ? curLValue : Math.min(lValue, curLValue); + } else if (curBValue != null) { + bValue = bValue == null ? curBValue : bValue && curBValue; + } else if (curSValue != null) { + if (sValue == null || curSValue.compareTo(sValue) < 0) { + sValue = curSValue; + } + } + } else if (aggregation == Aggregation.MAX) { + if (curDValue != null) { + dValue = dValue == null ? curDValue : Math.max(dValue, curDValue); + } else if (curLValue != null) { + lValue = lValue == null ? curLValue : Math.max(lValue, curLValue); + } else if (curBValue != null) { + bValue = bValue == null ? curBValue : bValue || curBValue; + } else if (curSValue != null) { + if (sValue == null || curSValue.compareTo(sValue) > 0) { + sValue = curSValue; + } + } + } + } + } + if (dataType == null) { + return Optional.empty(); + } else if (aggregation == Aggregation.COUNT) { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count))); + } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { + if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) { + return Optional.empty(); + } else if (dataType == DataType.DOUBLE) { + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count)))); + } else if (dataType == DataType.LONG) { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count)))); + } + } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { + if (dataType == DataType.DOUBLE) { + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue))); + } else if (dataType == DataType.LONG) { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue))); + } else if (dataType == DataType.STRING) { + return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue))); + } else { + return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue))); + } + } + return null; + } + + private Boolean getBooleanValue(Row row) { + if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { + return row.getBool(BOOL_POS); + } else { + return null; + } + } + + private String getStringValue(Row row) { + if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { + return row.getString(STR_POS); + } else { + return null; + } + } + + private Long getLongValue(Row row) { + if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX + || aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) { + return row.getLong(LONG_POS); + } else { + return null; + } + } + + private Double getDoubleValue(Row row) { + if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX + || aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) { + return row.getDouble(DOUBLE_POS); + } else { + return null; + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index 09c415c0eb..341972902c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,10 @@ package org.thingsboard.server.dao.timeseries; import com.datastax.driver.core.*; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; +import com.google.common.base.Function; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -26,7 +30,16 @@ import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.dao.AbstractDao; import org.thingsboard.server.dao.model.ModelConstants; +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; @@ -41,48 +54,136 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { @Value("${cassandra.query.max_limit_per_request}") protected Integer maxLimitPerRequest; + @Value("${cassandra.query.read_result_processing_threads}") + private int readResultsProcessingThreads; + + @Value("${cassandra.query.min_read_step}") + private int minReadStep; + + @Value("${cassandra.query.ts_key_value_partitioning}") + private String partitioning; + + private TsPartitionDate tsFormat; + + private ExecutorService readResultsProcessingExecutor; + private PreparedStatement partitionInsertStmt; private PreparedStatement[] latestInsertStmts; private PreparedStatement[] saveStmts; + private PreparedStatement[] fetchStmts; private PreparedStatement findLatestStmt; private PreparedStatement findAllLatestStmt; - @Override - public List find(String entityType, UUID entityId, TsKvQuery query, Optional minPartition, Optional maxPartition) { - List rows = Collections.emptyList(); - Long[] parts = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition); - int partsLength = parts.length; - if (parts != null && partsLength > 0) { - int limit = maxLimitPerRequest; - Optional lim = query.getLimit(); - if (lim.isPresent() && lim.get() < maxLimitPerRequest) { - limit = lim.get(); - } - - rows = new ArrayList<>(limit); - int lastIdx = partsLength - 1; - for (int i = 0; i < partsLength; i++) { - int currentLimit; - if (rows.size() >= limit) { - break; - } else { - currentLimit = limit - rows.size(); - } - Long partition = parts[i]; - Select.Where where = select().from(ModelConstants.TS_KV_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType)) - .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)) - .and(eq(ModelConstants.KEY_COLUMN, query.getKey())) - .and(eq(ModelConstants.PARTITION_COLUMN, partition)); - if (i == 0 && query.getStartTs().isPresent()) { - where.and(QueryBuilder.gt(ModelConstants.TS_COLUMN, query.getStartTs().get())); - } else if (i == lastIdx && query.getEndTs().isPresent()) { - where.and(QueryBuilder.lte(ModelConstants.TS_COLUMN, query.getEndTs().get())); - } - where.limit(currentLimit); - rows.addAll(executeRead(where).all()); - } + @PostConstruct + public void init() { + getFetchStmt(Aggregation.NONE); + readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads); + Optional partition = TsPartitionDate.parse(partitioning); + if (partition.isPresent()) { + tsFormat = partition.get(); + } else { + log.warn("Incorrect configuration of partitioning {}", partitioning); + throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); } - return convertResultToTsKvEntryList(rows); + } + + @PreDestroy + public void stop() { + if (readResultsProcessingExecutor != null) { + readResultsProcessingExecutor.shutdownNow(); + } + } + + @Override + public long toPartitionTs(long ts) { + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli(); + } + + + private static String[] getFetchColumnNames(Aggregation aggregation) { + switch (aggregation) { + case NONE: + return ModelConstants.NONE_AGGREGATION_COLUMNS; + case MIN: + return ModelConstants.MIN_AGGREGATION_COLUMNS; + case MAX: + return ModelConstants.MAX_AGGREGATION_COLUMNS; + case SUM: + return ModelConstants.SUM_AGGREGATION_COLUMNS; + case COUNT: + return ModelConstants.COUNT_AGGREGATION_COLUMNS; + case AVG: + return ModelConstants.AVG_AGGREGATION_COLUMNS; + default: + throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!"); + } + } + + @Override + public ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) { + if (query.getAggregation() == Aggregation.NONE) { + //TODO: + return null; + } else { + long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep); + long stepTs = query.getStartTs(); + List>> futures = new ArrayList<>(); + while (stepTs < query.getEndTs()) { + long startTs = stepTs; + long endTs = stepTs + step; + TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, 1, query.getAggregation()); + futures.add(findAndAggregateAsync(entityType, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); + stepTs = endTs; + } + ListenableFuture>> future = Futures.allAsList(futures); + return Futures.transform(future, new Function>, List>() { + @Nullable + @Override + public List apply(@Nullable List> input) { + return input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList()); + } + }); + } + } + + private ListenableFuture> findAndAggregateAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) { + final Aggregation aggregation = query.getAggregation(); + final long startTs = query.getStartTs(); + final long endTs = query.getEndTs(); + final long ts = startTs + (endTs - startTs) / 2; + + ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition); + com.google.common.base.Function> toArrayFunction = rows -> rows.all().stream() + .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList()); + + ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor); + + AsyncFunction, List> fetchChunksFunction = partitions -> { + try { + PreparedStatement proto = getFetchStmt(aggregation); + List futures = new ArrayList<>(partitions.size()); + for (Long partition : partitions) { + BoundStatement stmt = proto.bind(); + stmt.setString(0, entityType); + stmt.setUUID(1, entityId); + stmt.setString(2, query.getKey()); + stmt.setLong(3, partition); + stmt.setLong(4, startTs); + stmt.setLong(5, endTs); + log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId); + futures.add(executeAsyncRead(stmt)); + } + return Futures.allAsList(futures); + } catch (Throwable e) { + log.error("Failed to fetch data", e); + throw e; + } + }; + + ListenableFuture> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor); + + return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor); } @Override @@ -190,13 +291,12 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { * Select existing partitions from the table * {@link ModelConstants#TS_KV_PARTITIONS_CF} for the given entity */ - private Long[] fetchPartitions(String entityType, UUID entityId, String key, Optional minPartition, Optional maxPartition) { + private ResultSetFuture fetchPartitions(String entityType, UUID entityId, String key, long minPartition, long maxPartition) { Select.Where select = QueryBuilder.select(ModelConstants.PARTITION_COLUMN).from(ModelConstants.TS_KV_PARTITIONS_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType)) .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)).and(eq(ModelConstants.KEY_COLUMN, key)); - minPartition.ifPresent(startTs -> select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition.get()))); - maxPartition.ifPresent(endTs -> select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition.get()))); - ResultSet resultSet = executeRead(select); - return resultSet.all().stream().map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).toArray(Long[]::new); + select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition)); + select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition)); + return executeAsyncRead(select); } private PreparedStatement getSaveStmt(DataType dataType) { @@ -216,6 +316,23 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { return saveStmts[dataType.ordinal()]; } + private PreparedStatement getFetchStmt(Aggregation aggType) { + if (fetchStmts == null) { + fetchStmts = new PreparedStatement[Aggregation.values().length]; + for (Aggregation type : Aggregation.values()) { + fetchStmts[type.ordinal()] = getSession().prepare("SELECT " + + String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " + + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? " + + "AND " + ModelConstants.KEY_COLUMN + " = ? " + + "AND " + ModelConstants.PARTITION_COLUMN + " = ? " + + "AND " + ModelConstants.TS_COLUMN + " > ? " + + "AND " + ModelConstants.TS_COLUMN + " <= ?"); + } + } + return fetchStmts[aggType.ordinal()]; + } + private PreparedStatement getLatestStmt(DataType dataType) { if (latestInsertStmts == null) { latestInsertStmts = new PreparedStatement[DataType.values().length]; diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 419b53447e..a8b4ef5111 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,21 +23,23 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.kv.BaseTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.dao.exception.IncorrectParameterException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.server.dao.service.Validator; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -50,38 +52,14 @@ public class BaseTimeseriesService implements TimeseriesService { public static final int INSERTS_PER_ENTRY = 3; - @Value("${cassandra.query.ts_key_value_partitioning}") - private String partitioning; - @Autowired private TimeseriesDao timeseriesDao; - private TsPartitionDate tsFormat; - - @PostConstruct - public void init() { - Optional partition = TsPartitionDate.parse(partitioning); - if (partition.isPresent()) { - tsFormat = partition.get(); - } else { - log.warn("Incorrect configuration of partitioning {}", partitioning); - throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); - } - } - @Override - public List find(String entityType, UUIDBased entityId, TsKvQuery query) { + public ListenableFuture> findAll(String entityType, UUIDBased entityId, TsKvQuery query) { validate(entityType, entityId); validate(query); - return timeseriesDao.find(entityType, entityId.getId(), query, toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs())); - } - - private Optional toPartitionTs(Optional ts) { - if (ts.isPresent()) { - return Optional.of(toPartitionTs(ts.get())); - } else { - return Optional.empty(); - } + return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs())); } @Override @@ -106,7 +84,7 @@ public class BaseTimeseriesService implements TimeseriesService { throw new IncorrectParameterException("Key value entry can't be null"); } UUID uid = entityId.getId(); - long partitionTs = toPartitionTs(tsKvEntry.getTs()); + long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs()); List futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs); @@ -122,7 +100,7 @@ public class BaseTimeseriesService implements TimeseriesService { throw new IncorrectParameterException("Key value entry can't be null"); } UUID uid = entityId.getId(); - long partitionTs = toPartitionTs(tsKvEntry.getTs()); + long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs()); saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs); } return Futures.allAsList(futures); @@ -144,14 +122,6 @@ public class BaseTimeseriesService implements TimeseriesService { futures.add(timeseriesDao.save(entityType, uid, partitionTs, tsKvEntry)); } - private long toPartitionTs(long ts) { - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); - - LocalDateTime parititonTime = tsFormat.truncatedTo(time); - - return parititonTime.toInstant(ZoneOffset.UTC).toEpochMilli(); - } - private static void validate(String entityType, UUIDBased entityId) { Validator.validateString(entityType, "Incorrect entityType " + entityType); Validator.validateId(entityId, "Incorrect entityId " + entityId); @@ -163,5 +133,6 @@ public class BaseTimeseriesService implements TimeseriesService { } else if (isBlank(query.getKey())) { throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty"); } + //TODO: add validation of all params } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 294f57445c..83b78da646 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Row; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvQuery; @@ -30,7 +31,11 @@ import java.util.UUID; */ public interface TimeseriesDao { - List find(String entityType, UUID entityId, TsKvQuery query, Optional minPartition, Optional maxPartition); + long toPartitionTs(long ts); + + ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition); + +// List find(String entityType, UUID entityId, TsKvQuery query, Optional minPartition, Optional maxPartition); ResultSetFuture findLatest(String entityType, UUID entityId, String key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index d8b31af5d2..1bafdea37f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -19,6 +19,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Row; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvQuery; @@ -32,8 +33,7 @@ import java.util.Set; */ public interface TimeseriesService { - //TODO: Replace this with async operation - List find(String entityType, UUIDBased entityId, TsKvQuery query); + ListenableFuture> findAll(String entityType, UUIDBased entityId, TsKvQuery query); ListenableFuture> findLatest(String entityType, UUIDBased entityId, Collection keys); diff --git a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java index b1502595cb..ac48536793 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java +++ b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java @@ -25,11 +25,11 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClassnameFilters({ - "org.thingsboard.server.dao.service.*Test", - "org.thingsboard.server.dao.kv.*Test", - "org.thingsboard.server.dao.plugin.*Test", - "org.thingsboard.server.dao.rule.*Test", - "org.thingsboard.server.dao.attributes.*Test", +// "org.thingsboard.server.dao.service.*Test", +// "org.thingsboard.server.dao.kv.*Test", +// "org.thingsboard.server.dao.plugin.*Test", +// "org.thingsboard.server.dao.rule.*Test", +// "org.thingsboard.server.dao.attributes.*Test", "org.thingsboard.server.dao.timeseries.*Test" }) public class DaoTestSuite { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java index 9e4f492dcc..51fce6fbb0 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java @@ -116,14 +116,36 @@ public class TimeseriesServiceTest extends AbstractServiceTest { entries.add(tsKvEntry); } log.debug("Saved all records {}", localDateTime); - List list = tsService.find(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(), - LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli())); + List list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(), + LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get(); log.debug("Fetched records {}", localDateTime); List expected = entries.subList(600, PARTITION_MINUTES); assertEquals(expected.size(), list.size()); assertEquals(expected, list); } +// @Test +// public void testFindDeviceTsDataByQuery() throws Exception { +// DeviceId deviceId = new DeviceId(UUIDs.timeBased()); +// LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES); +// log.debug("Start event time is {}", localDateTime); +// List entries = new ArrayList<>(PARTITION_MINUTES); +// +// for (int i = 0; i < PARTITION_MINUTES; i++) { +// long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli(); +// BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry); +// tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get(); +// entries.add(tsKvEntry); +// } +// log.debug("Saved all records {}", localDateTime); +// List list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(), +// LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get(); +// log.debug("Fetched records {}", localDateTime); +// List expected = entries.subList(600, PARTITION_MINUTES); +// assertEquals(expected.size(), list.size()); +// assertEquals(expected, list); +// } + private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException { tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get(); diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 82fcbe1949..0a207e7068 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster cassandra.keyspace_name=thingsboard -cassandra.url=127.0.0.1:9142 +cassandra.url=127.0.0.1:9042 cassandra.ssl=false @@ -47,3 +47,7 @@ cassandra.query.default_fetch_size=2000 cassandra.query.ts_key_value_partitioning=HOURS cassandra.query.max_limit_per_request=1000 + +cassandra.query.read_result_processing_threads=3 + +cassandra.query.min_read_step=100 \ No newline at end of file diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java index fb6ae07962..b5f27a1e5a 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java @@ -84,6 +84,8 @@ public interface PluginContext { List loadTimeseries(DeviceId deviceId, TsKvQuery query); + void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback> callback); + void loadLatestTimeseries(DeviceId deviceId, Collection keys, PluginCallback> callback); void loadLatestTimeseries(DeviceId deviceId, PluginCallback> callback); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index d441e81e60..e93f0b5012 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -95,8 +95,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { Optional limit = request.getIntParamValue("limit"); Map> data = new LinkedHashMap<>(); for (String key : keys.split(",")) { - List entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit)); - data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList())); + //TODO: refactoring +// List entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit)); +// data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList())); } msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK)); } else if ("attributes".equals(entity)) {