From 0c98ec78f0c3e5925a58c8dd0970208c21f73cfd Mon Sep 17 00:00:00 2001 From: Igor Khanenko Date: Thu, 1 Dec 2016 11:42:53 +0200 Subject: [PATCH] Timeseries and Attributes DAO --- .../server/dao/attributes/AttributesDao.java | 37 +++ .../dao/attributes/AttributesService.java | 40 +++ .../dao/attributes/BaseAttributesDao.java | 154 +++++++++ .../dao/attributes/BaseAttributesService.java | 87 +++++ .../dao/timeseries/BaseTimeseriesDao.java | 313 ++++++++++++++++++ .../dao/timeseries/BaseTimeseriesService.java | 167 ++++++++++ .../server/dao/timeseries/TimeseriesDao.java | 49 +++ .../dao/timeseries/TimeseriesService.java | 50 +++ .../dao/timeseries/TsPartitionDate.java | 66 ++++ 9 files changed, 963 insertions(+) create mode 100644 dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java new file mode 100644 index 0000000000..8089db992c --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import com.datastax.driver.core.ResultSetFuture; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; + +import java.util.List; +import java.util.UUID; + +/** + * @author Andrew Shvayka + */ +public interface AttributesDao { + + AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey); + + List findAll(EntityId entityId, String attributeType); + + ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute); + + void removeAll(EntityId entityId, String scope, List keys); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java new file mode 100644 index 0000000000..5e72c90a92 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; + +import java.util.List; + +/** + * @author Andrew Shvayka + */ +public interface AttributesService { + + AttributeKvEntry find(EntityId entityId, String scope, String attributeKey); + + List findAll(EntityId entityId, String scope); + + ListenableFuture> save(EntityId entityId, String scope, List attributes); + + void removeAll(EntityId entityId, String scope, List attributeKeys); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java new file mode 100644 index 0000000000..4c542e3bf2 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java @@ -0,0 +1,154 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.dao.AbstractDao; +import org.thingsboard.server.dao.model.ModelConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.dao.timeseries.BaseTimeseriesDao; + +import java.util.ArrayList; +import java.util.List; + +import static org.thingsboard.server.dao.model.ModelConstants.*; +import static com.datastax.driver.core.querybuilder.QueryBuilder.*; + +/** + * @author Andrew Shvayka + */ +@Component +@Slf4j +public class BaseAttributesDao extends AbstractDao implements AttributesDao { + + private PreparedStatement saveStmt; + + @Override + public AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey) { + Select.Where select = select().from(ATTRIBUTES_KV_CF) + .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType())) + .and(eq(ENTITY_ID_COLUMN, entityId.getId())) + .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) + .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey)); + log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey); + return convertResultToAttributesKvEntry(attributeKey, executeRead(select).one()); + } + + @Override + public List findAll(EntityId entityId, String attributeType) { + Select.Where select = select().from(ATTRIBUTES_KV_CF) + .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType())) + .and(eq(ENTITY_ID_COLUMN, entityId.getId())) + .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)); + log.trace("Generated query [{}] for entityId {} and attributeType {}", select, entityId, attributeType); + return convertResultToAttributesKvEntryList(executeRead(select)); + } + + @Override + public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) { + BoundStatement stmt = getSaveStmt().bind(); + stmt.setString(0, entityId.getEntityType().name()); + stmt.setUUID(1, entityId.getId()); + stmt.setString(2, attributeType); + stmt.setString(3, attribute.getKey()); + stmt.setLong(4, attribute.getLastUpdateTs()); + stmt.setString(5, attribute.getStrValue().orElse(null)); + if (attribute.getBooleanValue().isPresent()) { + stmt.setBool(6, attribute.getBooleanValue().get()); + } else { + stmt.setToNull(6); + } + if (attribute.getLongValue().isPresent()) { + stmt.setLong(7, attribute.getLongValue().get()); + } else { + stmt.setToNull(7); + } + if (attribute.getDoubleValue().isPresent()) { + stmt.setDouble(8, attribute.getDoubleValue().get()); + } else { + stmt.setToNull(8); + } + return executeAsyncWrite(stmt); + } + + @Override + public void removeAll(EntityId entityId, String attributeType, List keys) { + for (String key : keys) { + delete(entityId, attributeType, key); + } + } + + private void delete(EntityId entityId, String attributeType, String key) { + Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF) + .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType())) + .and(eq(ENTITY_ID_COLUMN, entityId.getId())) + .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) + .and(eq(ATTRIBUTE_KEY_COLUMN, key)); + log.debug("Remove request: {}", delete.toString()); + getSession().execute(delete); + } + + private PreparedStatement getSaveStmt() { + if (saveStmt == null) { + saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + + "(" + ENTITY_TYPE_COLUMN + + "," + ENTITY_ID_COLUMN + + "," + ATTRIBUTE_TYPE_COLUMN + + "," + ATTRIBUTE_KEY_COLUMN + + "," + LAST_UPDATE_TS_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + + ")" + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)"); + } + return saveStmt; + } + + private AttributeKvEntry convertResultToAttributesKvEntry(String key, Row row) { + AttributeKvEntry attributeEntry = null; + if (row != null) { + long lastUpdateTs = row.get(LAST_UPDATE_TS_COLUMN, Long.class); + attributeEntry = new BaseAttributeKvEntry(BaseTimeseriesDao.toKvEntry(row, key), lastUpdateTs); + } + return attributeEntry; + } + + private List convertResultToAttributesKvEntryList(ResultSet resultSet) { + List rows = resultSet.all(); + List entries = new ArrayList<>(rows.size()); + if (!rows.isEmpty()) { + rows.stream().forEach(row -> { + String key = row.getString(ModelConstants.ATTRIBUTE_KEY_COLUMN); + AttributeKvEntry kvEntry = convertResultToAttributesKvEntry(key, row); + if (kvEntry != null) { + entries.add(kvEntry); + } + }); + } + return entries; + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java new file mode 100644 index 0000000000..3e9fc3ddd8 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -0,0 +1,87 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.dao.exception.IncorrectParameterException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.service.Validator; + +import java.util.List; + +/** + * @author Andrew Shvayka + */ +@Service +public class BaseAttributesService implements AttributesService { + + @Autowired + private AttributesDao attributesDao; + + @Override + public AttributeKvEntry find(EntityId entityId, String scope, String attributeKey) { + validate(entityId, scope); + Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); + return attributesDao.find(entityId, scope, attributeKey); + } + + @Override + public List findAll(EntityId entityId, String scope) { + validate(entityId, scope); + return attributesDao.findAll(entityId, scope); + } + + @Override + public ListenableFuture> save(EntityId entityId, String scope, List attributes) { + validate(entityId, scope); + attributes.forEach(attribute -> validate(attribute)); + List futures = Lists.newArrayListWithExpectedSize(attributes.size()); + for(AttributeKvEntry attribute : attributes) { + futures.add(attributesDao.save(entityId, scope, attribute)); + } + return Futures.allAsList(futures); + } + + @Override + public void removeAll(EntityId entityId, String scope, List keys) { + validate(entityId, scope); + attributesDao.removeAll(entityId, scope, keys); + } + + private static void validate(EntityId id, String scope) { + Validator.validateId(id.getId(), "Incorrect id " + id); + Validator.validateString(scope, "Incorrect scope " + scope); + } + + private static void validate(AttributeKvEntry kvEntry) { + if (kvEntry == null) { + throw new IncorrectParameterException("Key value entry can't be null"); + } else if (kvEntry.getDataType() == null) { + throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null"); + } else { + Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty"); + Validator.validatePositiveNumber(kvEntry.getLastUpdateTs(), "Incorrect last update ts. Ts should be positive"); + } + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java new file mode 100644 index 0000000000..79134f7b0b --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -0,0 +1,313 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.dao.AbstractDao; +import org.thingsboard.server.dao.model.ModelConstants; + +import java.util.*; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; + +/** + * @author Andrew Shvayka + */ +@Component +@Slf4j +public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { + + @Value("${cassandra.query.max_limit_per_request}") + protected Integer maxLimitPerRequest; + + private PreparedStatement partitionInsertStmt; + private PreparedStatement[] latestInsertStmts; + private PreparedStatement[] saveStmts; + private PreparedStatement findLatestStmt; + private PreparedStatement findAllLatestStmt; + + @Override + public List find(String entityType, UUID entityId, TsKvQuery query, Optional minPartition, Optional maxPartition) { + List rows = Collections.emptyList(); + Long[] parts = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition); + int partsLength = parts.length; + if (parts != null && partsLength > 0) { + int limit = maxLimitPerRequest; + Optional lim = query.getLimit(); + if (lim.isPresent() && lim.get() < maxLimitPerRequest) { + limit = lim.get(); + } + + rows = new ArrayList<>(limit); + int lastIdx = partsLength - 1; + for (int i = 0; i < partsLength; i++) { + int currentLimit; + if (rows.size() >= limit) { + break; + } else { + currentLimit = limit - rows.size(); + } + Long partition = parts[i]; + Select.Where where = select().from(ModelConstants.TS_KV_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType)) + .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)) + .and(eq(ModelConstants.KEY_COLUMN, query.getKey())) + .and(eq(ModelConstants.PARTITION_COLUMN, partition)); + if (i == 0 && query.getStartTs().isPresent()) { + where.and(QueryBuilder.gt(ModelConstants.TS_COLUMN, query.getStartTs().get())); + } else if (i == lastIdx && query.getEndTs().isPresent()) { + where.and(QueryBuilder.lte(ModelConstants.TS_COLUMN, query.getEndTs().get())); + } + where.limit(currentLimit); + rows.addAll(executeRead(where).all()); + } + } + return convertResultToTsKvEntryList(rows); + } + + @Override + public ResultSetFuture findLatest(String entityType, UUID entityId, String key) { + BoundStatement stmt = getFindLatestStmt().bind(); + stmt.setString(0, entityType); + stmt.setUUID(1, entityId); + stmt.setString(2, key); + log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId); + return executeAsyncRead(stmt); + } + + @Override + public ResultSetFuture findAllLatest(String entityType, UUID entityId) { + BoundStatement stmt = getFindAllLatestStmt().bind(); + stmt.setString(0, entityType); + stmt.setUUID(1, entityId); + log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId); + return executeAsyncRead(stmt); + } + + @Override + public ResultSetFuture save(String entityType, UUID entityId, long partition, TsKvEntry tsKvEntry) { + DataType type = tsKvEntry.getDataType(); + BoundStatement stmt = getSaveStmt(type).bind() + .setString(0, entityType) + .setUUID(1, entityId) + .setString(2, tsKvEntry.getKey()) + .setLong(3, partition) + .setLong(4, tsKvEntry.getTs()); + addValue(tsKvEntry, stmt, 5); + return executeAsyncWrite(stmt); + } + + @Override + public ResultSetFuture saveLatest(String entityType, UUID entityId, TsKvEntry tsKvEntry) { + DataType type = tsKvEntry.getDataType(); + BoundStatement stmt = getLatestStmt(type).bind() + .setString(0, entityType) + .setUUID(1, entityId) + .setString(2, tsKvEntry.getKey()) + .setLong(3, tsKvEntry.getTs()); + addValue(tsKvEntry, stmt, 4); + return executeAsyncWrite(stmt); + } + + @Override + public ResultSetFuture savePartition(String entityType, UUID entityId, long partition, String key) { + log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityType, entityId, key); + return executeAsyncWrite(getPartitionInsertStmt().bind() + .setString(0, entityType) + .setUUID(1, entityId) + .setLong(2, partition) + .setString(3, key)); + } + + @Override + public List convertResultToTsKvEntryList(List rows) { + List entries = new ArrayList<>(rows.size()); + if (!rows.isEmpty()) { + rows.stream().forEach(row -> { + TsKvEntry kvEntry = convertResultToTsKvEntry(row); + if (kvEntry != null) { + entries.add(kvEntry); + } + }); + } + return entries; + } + + @Override + public TsKvEntry convertResultToTsKvEntry(Row row) { + String key = row.getString(ModelConstants.KEY_COLUMN); + long ts = row.getLong(ModelConstants.TS_COLUMN); + return new BasicTsKvEntry(ts, toKvEntry(row, key)); + } + + public static KvEntry toKvEntry(Row row, String key) { + KvEntry kvEntry = null; + String strV = row.get(ModelConstants.STRING_VALUE_COLUMN, String.class); + if (strV != null) { + kvEntry = new StringDataEntry(key, strV); + } else { + Long longV = row.get(ModelConstants.LONG_VALUE_COLUMN, Long.class); + if (longV != null) { + kvEntry = new LongDataEntry(key, longV); + } else { + Double doubleV = row.get(ModelConstants.DOUBLE_VALUE_COLUMN, Double.class); + if (doubleV != null) { + kvEntry = new DoubleDataEntry(key, doubleV); + } else { + Boolean boolV = row.get(ModelConstants.BOOLEAN_VALUE_COLUMN, Boolean.class); + if (boolV != null) { + kvEntry = new BooleanDataEntry(key, boolV); + } else { + log.warn("All values in key-value row are nullable "); + } + } + } + } + return kvEntry; + } + + /** + * Select existing partitions from the table + * {@link ModelConstants#TS_KV_PARTITIONS_CF} for the given entity + */ + private Long[] fetchPartitions(String entityType, UUID entityId, String key, Optional minPartition, Optional maxPartition) { + Select.Where select = QueryBuilder.select(ModelConstants.PARTITION_COLUMN).from(ModelConstants.TS_KV_PARTITIONS_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType)) + .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)).and(eq(ModelConstants.KEY_COLUMN, key)); + minPartition.ifPresent(startTs -> select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition.get()))); + maxPartition.ifPresent(endTs -> select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition.get()))); + ResultSet resultSet = executeRead(select); + return resultSet.all().stream().map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).toArray(Long[]::new); + } + + private PreparedStatement getSaveStmt(DataType dataType) { + if (saveStmts == null) { + saveStmts = new PreparedStatement[DataType.values().length]; + for (DataType type : DataType.values()) { + saveStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + getColumnName(type) + ")" + + " VALUES(?, ?, ?, ?, ?, ?)"); + } + } + return saveStmts[dataType.ordinal()]; + } + + private PreparedStatement getLatestStmt(DataType dataType) { + if (latestInsertStmts == null) { + latestInsertStmts = new PreparedStatement[DataType.values().length]; + for (DataType type : DataType.values()) { + latestInsertStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_LATEST_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + getColumnName(type) + ")" + + " VALUES(?, ?, ?, ?, ?)"); + } + } + return latestInsertStmts[dataType.ordinal()]; + } + + + private PreparedStatement getPartitionInsertStmt() { + if (partitionInsertStmt == null) { + partitionInsertStmt = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_PARTITIONS_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.KEY_COLUMN + ")" + + " VALUES(?, ?, ?, ?)"); + } + return partitionInsertStmt; + } + + private PreparedStatement getFindLatestStmt() { + if (findLatestStmt == null) { + findLatestStmt = getSession().prepare("SELECT " + + ModelConstants.KEY_COLUMN + "," + + ModelConstants.TS_COLUMN + "," + + ModelConstants.STRING_VALUE_COLUMN + "," + + ModelConstants.BOOLEAN_VALUE_COLUMN + "," + + ModelConstants.LONG_VALUE_COLUMN + "," + + ModelConstants.DOUBLE_VALUE_COLUMN + " " + + "FROM " + ModelConstants.TS_KV_LATEST_CF + " " + + "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " + + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? " + + "AND " + ModelConstants.KEY_COLUMN + " = ? "); + } + return findLatestStmt; + } + + private PreparedStatement getFindAllLatestStmt() { + if (findAllLatestStmt == null) { + findAllLatestStmt = getSession().prepare("SELECT " + + ModelConstants.KEY_COLUMN + "," + + ModelConstants.TS_COLUMN + "," + + ModelConstants.STRING_VALUE_COLUMN + "," + + ModelConstants.BOOLEAN_VALUE_COLUMN + "," + + ModelConstants.LONG_VALUE_COLUMN + "," + + ModelConstants.DOUBLE_VALUE_COLUMN + " " + + "FROM " + ModelConstants.TS_KV_LATEST_CF + " " + + "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " + + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "); + } + return findAllLatestStmt; + } + + public static String getColumnName(DataType type) { + switch (type) { + case BOOLEAN: + return ModelConstants.BOOLEAN_VALUE_COLUMN; + case STRING: + return ModelConstants.STRING_VALUE_COLUMN; + case LONG: + return ModelConstants.LONG_VALUE_COLUMN; + case DOUBLE: + return ModelConstants.DOUBLE_VALUE_COLUMN; + default: + throw new RuntimeException("Not implemented!"); + } + } + + public static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) { + switch (kvEntry.getDataType()) { + case BOOLEAN: + stmt.setBool(column, kvEntry.getBooleanValue().get().booleanValue()); + break; + case STRING: + stmt.setString(column, kvEntry.getStrValue().get()); + break; + case LONG: + stmt.setLong(column, kvEntry.getLongValue().get().longValue()); + break; + case DOUBLE: + stmt.setDouble(column, kvEntry.getDoubleValue().get().doubleValue()); + break; + } + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java new file mode 100644 index 0000000000..49ef16c038 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -0,0 +1,167 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvQuery; +import org.thingsboard.server.dao.exception.IncorrectParameterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.service.Validator; + +import javax.annotation.PostConstruct; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.*; + +import static org.apache.commons.lang3.StringUtils.isBlank; + +/** + * @author Andrew Shvayka + */ +@Service +@Slf4j +public class BaseTimeseriesService implements TimeseriesService { + + public static final int INSERTS_PER_ENTRY = 3; + + @Value("${cassandra.query.ts_key_value_partitioning}") + private String partitioning; + + @Autowired + private TimeseriesDao timeseriesDao; + + private TsPartitionDate tsFormat; + + @PostConstruct + public void init() { + Optional partition = TsPartitionDate.parse(partitioning); + if (partition.isPresent()) { + tsFormat = partition.get(); + } else { + log.warn("Incorrect configuration of partitioning {}", partitioning); + throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); + } + } + + @Override + public List find(String entityType, UUIDBased entityId, TsKvQuery query) { + validate(entityType, entityId); + validate(query); + return timeseriesDao.find(entityType, entityId.getId(), query, toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs())); + } + + private Optional toPartitionTs(Optional ts) { + if (ts.isPresent()) { + return Optional.of(toPartitionTs(ts.get())); + } else { + return Optional.empty(); + } + } + + @Override + public ListenableFuture> findLatest(String entityType, UUIDBased entityId, Collection keys) { + validate(entityType, entityId); + List futures = Lists.newArrayListWithExpectedSize(keys.size()); + keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); + keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityType, entityId.getId(), key))); + return Futures.allAsList(futures); + } + + @Override + public ResultSetFuture findAllLatest(String entityType, UUIDBased entityId) { + validate(entityType, entityId); + return timeseriesDao.findAllLatest(entityType, entityId.getId()); + } + + @Override + public ListenableFuture> save(String entityType, UUIDBased entityId, TsKvEntry tsKvEntry) { + validate(entityType, entityId); + if (tsKvEntry == null) { + throw new IncorrectParameterException("Key value entry can't be null"); + } + UUID uid = entityId.getId(); + long partitionTs = toPartitionTs(tsKvEntry.getTs()); + + List futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); + saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs); + return Futures.allAsList(futures); + } + + @Override + public ListenableFuture> save(String entityType, UUIDBased entityId, List tsKvEntries) { + validate(entityType, entityId); + List futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY); + for (TsKvEntry tsKvEntry : tsKvEntries) { + if (tsKvEntry == null) { + throw new IncorrectParameterException("Key value entry can't be null"); + } + UUID uid = entityId.getId(); + long partitionTs = toPartitionTs(tsKvEntry.getTs()); + saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs); + } + return Futures.allAsList(futures); + } + + @Override + public TsKvEntry convertResultToTsKvEntry(Row row) { + return timeseriesDao.convertResultToTsKvEntry(row); + } + + @Override + public List convertResultSetToTsKvEntryList(ResultSet rs) { + return timeseriesDao.convertResultToTsKvEntryList(rs.all()); + } + + private void saveAndRegisterFutures(List futures, String entityType, TsKvEntry tsKvEntry, UUID uid, long partitionTs) { + futures.add(timeseriesDao.savePartition(entityType, uid, partitionTs, tsKvEntry.getKey())); + futures.add(timeseriesDao.saveLatest(entityType, uid, tsKvEntry)); + futures.add(timeseriesDao.save(entityType, uid, partitionTs, tsKvEntry)); + } + + private long toPartitionTs(long ts) { + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + + LocalDateTime parititonTime = tsFormat.truncatedTo(time); + + return parititonTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + } + + private static void validate(String entityType, UUIDBased entityId) { + Validator.validateString(entityType, "Incorrect entityType " + entityType); + Validator.validateId(entityId, "Incorrect entityId " + entityId); + } + + private static void validate(TsKvQuery query) { + if (query == null) { + throw new IncorrectParameterException("TsKvQuery can't be null"); + } else if (isBlank(query.getKey())) { + throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty"); + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java new file mode 100644 index 0000000000..eae9ab4675 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvQuery; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +/** + * @author Andrew Shvayka + */ +public interface TimeseriesDao { + + List find(String entityType, UUID entityId, TsKvQuery query, Optional minPartition, Optional maxPartition); + + ResultSetFuture findLatest(String entityType, UUID entityId, String key); + + ResultSetFuture findAllLatest(String entityType, UUID entityId); + + ResultSetFuture save(String entityType, UUID entityId, long partition, TsKvEntry tsKvEntry); + + ResultSetFuture savePartition(String entityType, UUID entityId, long partition, String key); + + ResultSetFuture saveLatest(String entityType, UUID entityId, TsKvEntry tsKvEntry); + + TsKvEntry convertResultToTsKvEntry(Row row); + + List convertResultToTsKvEntryList(List rows); + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java new file mode 100644 index 0000000000..b165f79c52 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvQuery; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +/** + * @author Andrew Shvayka + */ +public interface TimeseriesService { + + //TODO: Replace this with async operation + List find(String entityType, UUIDBased entityId, TsKvQuery query); + + ListenableFuture> findLatest(String entityType, UUIDBased entityId, Collection keys); + + ResultSetFuture findAllLatest(String entityType, UUIDBased entityId); + + ListenableFuture> save(String entityType, UUIDBased entityId, TsKvEntry tsKvEntry); + + ListenableFuture> save(String entityType, UUIDBased entityId, List tsKvEntry); + + TsKvEntry convertResultToTsKvEntry(Row row); + + List convertResultSetToTsKvEntryList(ResultSet rs); + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java new file mode 100644 index 0000000000..1a1637d52d --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; +import java.util.Optional; + +public enum TsPartitionDate { + + MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS); + + private final String pattern; + private final TemporalUnit truncateUnit; + + TsPartitionDate(String pattern, TemporalUnit truncateUnit) { + this.pattern = pattern; + this.truncateUnit = truncateUnit; + } + + public String getPattern() { + return pattern; + } + + public TemporalUnit getTruncateUnit() { + return truncateUnit; + } + + public LocalDateTime truncatedTo(LocalDateTime time) { + switch (this){ + case MONTHS: + return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1); + case YEARS: + return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); + default: + return time.truncatedTo(truncateUnit); + } + } + + public static Optional parse(String name) { + TsPartitionDate partition = null; + if (name != null) { + for (TsPartitionDate partitionDate : TsPartitionDate.values()) { + if (partitionDate.name().equalsIgnoreCase(name)) { + partition = partitionDate; + break; + } + } + } + return Optional.of(partition); + } +}