From 5935de07b3aa94deecbd3c58fe2bc6420b033778 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 23 Jun 2017 19:49:58 +0300 Subject: [PATCH] Added get ts async implementation --- .../dao/model/sql/AttributeKvEntity.java | 4 +- .../dao/model/sql/TsKvCompositeKey.java | 3 +- .../server/dao/model/sql/TsKvEntity.java | 49 ++++++++++- .../dao/model/sql/TsKvLatestCompositeKey.java | 3 +- .../dao/model/sql/TsKvLatestEntity.java | 4 +- .../dao/sql/attributes/JpaAttributeDao.java | 4 +- .../dao/sql/timeseries/JpaTimeseriesDao.java | 86 +++++++++++++++++-- .../dao/sql/timeseries/TsKvRepository.java | 65 ++++++++++++++ 8 files changed, 204 insertions(+), 14 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java index 2ea58afcf2..c7ba821773 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.model.sql; import lombok.Data; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.dao.model.ToData; @@ -32,8 +33,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*; public class AttributeKvEntity implements ToData, Serializable { @Id + @Enumerated(EnumType.STRING) @Column(name = ENTITY_TYPE_COLUMN) - private String entityType; + private EntityType entityType; @Id @Column(name = ENTITY_ID_COLUMN) diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java index 48fc75c152..57df750f79 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.model.sql; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.common.data.EntityType; import javax.persistence.Transient; import java.io.Serializable; @@ -31,7 +32,7 @@ public class TsKvCompositeKey implements Serializable{ @Transient private static final long serialVersionUID = -4089175869616037523L; - private String entityType; + private EntityType entityType; private UUID entityId; private String key; private long ts; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java index 603da6f3a4..356f3bd4a3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java @@ -16,7 +16,8 @@ package org.thingsboard.server.dao.model.sql; import lombok.Data; -import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.dao.model.ToData; import javax.persistence.*; @@ -30,9 +31,41 @@ import static org.thingsboard.server.dao.model.ModelConstants.*; @IdClass(TsKvCompositeKey.class) public final class TsKvEntity implements ToData { + public TsKvEntity() { + } + + public TsKvEntity(Double avgLongValue, Double avgDoubleValue) { + this.longValue = avgLongValue.longValue(); + this.doubleValue = avgDoubleValue; + } + + public TsKvEntity(Long sumLongValue, Double sumDoubleValue) { + this.longValue = sumLongValue; + this.doubleValue = sumDoubleValue; + } + + public TsKvEntity(String strValue, Long longValue, Double doubleValue) { + this.strValue = strValue; + this.longValue = longValue; + this.doubleValue = doubleValue; + } + + public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) { + if (booleanValueCount != 0) { + this.longValue = booleanValueCount; + } else if (strValueCount != 0) { + this.longValue = strValueCount; + } else if (longValueCount != 0) { + this.longValue = longValueCount; + } else if (doubleValueCount != 0) { + this.longValue = doubleValueCount; + } + } + @Id + @Enumerated(EnumType.STRING) @Column(name = ENTITY_TYPE_COLUMN) - private String entityType; + private EntityType entityType; @Id @Column(name = ENTITY_ID_COLUMN) @@ -60,6 +93,16 @@ public final class TsKvEntity implements ToData { @Override public TsKvEntry toData() { - return null; + KvEntry kvEntry = null; + if (strValue != null) { + kvEntry = new StringDataEntry(key, strValue); + } else if (longValue != null) { + kvEntry = new LongDataEntry(key, longValue); + } else if (doubleValue != null) { + kvEntry = new DoubleDataEntry(key, doubleValue); + } else if (booleanValue != null) { + kvEntry = new BooleanDataEntry(key, booleanValue); + } + return new BasicTsKvEntry(ts, kvEntry); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java index ab6006f610..f7bf3b1281 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.model.sql; import lombok.*; +import org.thingsboard.server.common.data.EntityType; import javax.persistence.Transient; import java.io.Serializable; @@ -29,7 +30,7 @@ public class TsKvLatestCompositeKey implements Serializable{ @Transient private static final long serialVersionUID = -4089175869616037523L; - private String entityType; + private EntityType entityType; private UUID entityId; private String key; } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java index b9a4543e23..47a5e006f7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.model.sql; import lombok.Data; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.dao.model.ToData; @@ -31,8 +32,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*; public final class TsKvLatestEntity implements ToData { @Id + @Enumerated(EnumType.STRING) @Column(name = ENTITY_TYPE_COLUMN) - private String entityType; + private EntityType entityType; @Id @Column(name = ENTITY_ID_COLUMN) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index cf69004050..c77d339bd2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -83,7 +83,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl @Override public ListenableFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) { AttributeKvEntity entity = new AttributeKvEntity(); - entity.setEntityType(entityId.getEntityType().name()); + entity.setEntityType(entityId.getEntityType()); entity.setEntityId(entityId.getId()); entity.setAttributeType(attributeType); entity.setAttributeKey(attribute.getKey()); @@ -104,7 +104,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl .stream() .map(key -> { AttributeKvEntity entityToDelete = new AttributeKvEntity(); - entityToDelete.setEntityType(entityId.getEntityType().name()); + entityToDelete.setEntityType(entityId.getEntityType()); entityToDelete.setEntityId(entityId.getId()); entityToDelete.setAttributeType(attributeType); entityToDelete.setAttributeKey(key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index bc056e1c64..7b64a4fdb0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageRequest; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.Aggregation; @@ -35,6 +36,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; import org.thingsboard.server.dao.timeseries.TimeseriesDao; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -73,19 +75,93 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp if (query.getAggregation() == Aggregation.NONE) { return findAllAsyncWithLimit(entityId, query); } else { - return service.submit(() -> null); + long stepTs = query.getStartTs(); + List> futures = new ArrayList<>(); + while (stepTs < query.getEndTs()) { + long startTs = stepTs; + long endTs = stepTs + query.getInterval(); + long ts = startTs + (endTs - startTs) / 2; + futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); + stepTs = endTs; + } + return Futures.allAsList(futures); } } + private ListenableFuture findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { + TsKvEntity entity; + switch (aggregation) { + case AVG: + entity = tsKvRepository.findAvg( + entityId.getId(), + entityId.getEntityType(), + key, + startTs, + endTs); + + break; + case MAX: + entity = tsKvRepository.findMax( + entityId.getId(), + entityId.getEntityType(), + key, + startTs, + endTs); + + break; + case MIN: + entity = tsKvRepository.findMin( + entityId.getId(), + entityId.getEntityType(), + key, + startTs, + endTs); + + break; + case SUM: + entity = tsKvRepository.findSum( + entityId.getId(), + entityId.getEntityType(), + key, + startTs, + endTs); + + break; + case COUNT: + entity = tsKvRepository.findCount( + entityId.getId(), + entityId.getEntityType(), + key, + startTs, + endTs); + + break; + default: + entity = null; + } + if (entity != null){ + entity.setTs(ts); + } + return service.submit(() -> DaoUtil.getData(entity)); + } + private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) { - return service.submit(() -> null); + return service.submit(() -> + DaoUtil.convertDataList( + tsKvRepository.findAllWithLimit( + entityId.getId(), + entityId.getEntityType(), + query.getKey(), + query.getStartTs(), + query.getEndTs(), + new PageRequest(0, query.getLimit())))); } @Override public ListenableFuture findLatest(EntityId entityId, String key) { TsKvLatestCompositeKey compositeKey = new TsKvLatestCompositeKey( - entityId.getEntityType().name(), + entityId.getEntityType(), entityId.getId(), key); return service.submit(() -> @@ -104,7 +180,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp @Override public ListenableFuture save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) { TsKvEntity entity = new TsKvEntity(); - entity.setEntityType(entityId.getEntityType().name()); + entity.setEntityType(entityId.getEntityType()); entity.setEntityId(entityId.getId()); entity.setTs(tsKvEntry.getTs()); entity.setKey(tsKvEntry.getKey()); @@ -126,7 +202,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp @Override public ListenableFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { TsKvLatestEntity latestEntity = new TsKvLatestEntity(); - latestEntity.setEntityType(entityId.getEntityType().name()); + latestEntity.setEntityType(entityId.getEntityType()); latestEntity.setEntityId(entityId.getId()); latestEntity.setTs(tsKvEntry.getTs()); latestEntity.setKey(tsKvEntry.getKey()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java index 786184e157..a3f59cd260 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java @@ -15,11 +15,76 @@ */ package org.thingsboard.server.dao.sql.timeseries; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.dao.annotation.SqlDao; import org.thingsboard.server.dao.model.sql.TsKvCompositeKey; import org.thingsboard.server.dao.model.sql.TsKvEntity; +import java.util.List; +import java.util.UUID; + @SqlDao public interface TsKvRepository extends CrudRepository { + + @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + + "AND tskv.entityType = :entityType AND tskv.key = :entityKey " + + "AND tskv.ts > :startTs AND tskv.ts < :endTs ORDER BY tskv.ts DESC") + List findAllWithLimit(@Param("entityId") UUID entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String key, + @Param("startTs") long startTs, + @Param("endTs") long endTs, + Pageable pageable); + + @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " + + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") + TsKvEntity findMax(@Param("entityId") UUID entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); + + @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " + + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") + TsKvEntity findMin(@Param("entityId") UUID entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); + + + @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " + + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") + TsKvEntity findCount(@Param("entityId") UUID entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); + + + @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " + + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") + TsKvEntity findAvg(@Param("entityId") UUID entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); + + + @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " + + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") + TsKvEntity findSum(@Param("entityId") UUID entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); }