From 7a2b76b8c0d3e50370dcb7dc282bb39fe90f6068 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Mon, 24 Feb 2020 18:24:40 +0200 Subject: [PATCH] SQL DAO Refactoring --- .../server/dao/HsqlTsDaoConfig.java | 8 +- .../server/dao/PsqlTsDaoConfig.java | 8 +- .../server/dao/TimescaleDaoConfig.java | 6 +- .../model/sqlts/hsql/TsKvCompositeKey.java | 39 ---- .../dao/model/sqlts/hsql/TsKvEntity.java | 105 ---------- .../{ => ts}/TimescaleTsKvCompositeKey.java | 4 +- .../{ => ts}/TimescaleTsKvEntity.java | 4 +- .../sqlts/{psql => ts}/TsKvCompositeKey.java | 4 +- .../model/sqlts/{psql => ts}/TsKvEntity.java | 2 +- ...stractChunkedAggregationTimeseriesDao.java | 183 ++++++++++++++++-- .../dao/sqlts/AbstractSqlTimeseriesDao.java | 1 + .../dao/sqlts/hsql/JpaHsqlTimeseriesDao.java | 177 +---------------- .../dao/sqlts/hsql/TsKvHsqlRepository.java | 132 ------------- .../AbstractInsertRepository.java | 4 +- .../{ => insert}/InsertTsRepository.java | 3 +- .../hsql/HsqlInsertTsRepository.java | 10 +- .../latest}/InsertLatestTsRepository.java | 2 +- .../hsql}/HsqlLatestInsertTsRepository.java | 8 +- .../psql}/PsqlLatestInsertTsRepository.java | 8 +- .../psql/PsqlInsertTsRepository.java | 10 +- .../psql/PsqlPartitioningRepository.java | 4 +- .../TimescaleInsertTsRepository.java | 10 +- .../dao/sqlts/psql/JpaPsqlTimeseriesDao.java | 178 +---------------- .../timescale/AggregationRepository.java | 4 +- .../timescale/TimescaleTimeseriesDao.java | 6 +- .../timescale/TsKvTimescaleRepository.java | 6 +- .../TsKvRepository.java} | 8 +- 27 files changed, 235 insertions(+), 699 deletions(-) delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvCompositeKey.java delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvEntity.java rename dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/{ => ts}/TimescaleTsKvCompositeKey.java (94%) rename dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/{ => ts}/TimescaleTsKvEntity.java (99%) rename dao/src/main/java/org/thingsboard/server/dao/model/sqlts/{psql => ts}/TsKvCompositeKey.java (95%) rename dao/src/main/java/org/thingsboard/server/dao/model/sqlts/{psql => ts}/TsKvEntity.java (98%) delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/TsKvHsqlRepository.java rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{ => insert}/AbstractInsertRepository.java (96%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{ => insert}/InsertTsRepository.java (88%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{ => insert}/hsql/HsqlInsertTsRepository.java (93%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{ => insert/latest}/InsertLatestTsRepository.java (93%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{latest => insert/latest/hsql}/HsqlLatestInsertTsRepository.java (94%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{latest => insert/latest/psql}/PsqlLatestInsertTsRepository.java (96%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{ => insert}/psql/PsqlInsertTsRepository.java (94%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{ => insert}/psql/PsqlPartitioningRepository.java (95%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{ => insert}/timescale/TimescaleInsertTsRepository.java (92%) rename dao/src/main/java/org/thingsboard/server/dao/sqlts/{psql/TsKvPsqlRepository.java => ts/TsKvRepository.java} (96%) diff --git a/dao/src/main/java/org/thingsboard/server/dao/HsqlTsDaoConfig.java b/dao/src/main/java/org/thingsboard/server/dao/HsqlTsDaoConfig.java index cbe8571922..e2519191e9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/HsqlTsDaoConfig.java +++ b/dao/src/main/java/org/thingsboard/server/dao/HsqlTsDaoConfig.java @@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao; @Configuration @EnableAutoConfiguration -@ComponentScan({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest"}) -@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"}) -@EntityScan({"org.thingsboard.server.dao.model.sqlts.hsql", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"}) +@ComponentScan({"org.thingsboard.server.dao.sqlts.hsql"}) +@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.ts", "org.thingsboard.server.dao.sqlts.insert.hsql", "org.thingsboard.server.dao.sqlts.insert.latest.hsql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"}) +@EntityScan({"org.thingsboard.server.dao.model.sqlts.ts", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"}) @EnableTransactionManagement @SqlTsDao @HsqlDao public class HsqlTsDaoConfig { -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/PsqlTsDaoConfig.java b/dao/src/main/java/org/thingsboard/server/dao/PsqlTsDaoConfig.java index e3caf5e3d3..65f17709ca 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/PsqlTsDaoConfig.java +++ b/dao/src/main/java/org/thingsboard/server/dao/PsqlTsDaoConfig.java @@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao; @Configuration @EnableAutoConfiguration -@ComponentScan({"org.thingsboard.server.dao.sqlts.psql", "org.thingsboard.server.dao.sqlts.latest"}) -@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.psql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"}) -@EntityScan({"org.thingsboard.server.dao.model.sqlts.psql", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"}) +@ComponentScan({"org.thingsboard.server.dao.sqlts.psql"}) +@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.ts", "org.thingsboard.server.dao.sqlts.insert.psql", "org.thingsboard.server.dao.sqlts.insert.latest.psql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"}) +@EntityScan({"org.thingsboard.server.dao.model.sqlts.ts", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"}) @EnableTransactionManagement @SqlTsDao @PsqlDao public class PsqlTsDaoConfig { -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/TimescaleDaoConfig.java b/dao/src/main/java/org/thingsboard/server/dao/TimescaleDaoConfig.java index 99cea08d7e..19ae98c736 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/TimescaleDaoConfig.java +++ b/dao/src/main/java/org/thingsboard/server/dao/TimescaleDaoConfig.java @@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao; @Configuration @EnableAutoConfiguration -@ComponentScan({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.latest"}) -@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.dictionary", "org.thingsboard.server.dao.sqlts.latest"}) +@ComponentScan({"org.thingsboard.server.dao.sqlts.timescale"}) +@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.insert.latest.psql", "org.thingsboard.server.dao.sqlts.insert.timescale", "org.thingsboard.server.dao.sqlts.dictionary", "org.thingsboard.server.dao.sqlts.latest"}) @EntityScan({"org.thingsboard.server.dao.model.sqlts.timescale", "org.thingsboard.server.dao.model.sqlts.dictionary", "org.thingsboard.server.dao.model.sqlts.latest"}) @EnableTransactionManagement @TimescaleDBTsDao @PsqlDao public class TimescaleDaoConfig { -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvCompositeKey.java deleted file mode 100644 index a17d1373b0..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvCompositeKey.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.model.sqlts.hsql; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.thingsboard.server.common.data.EntityType; - -import javax.persistence.Transient; -import java.io.Serializable; -import java.util.UUID; - -@Data -@AllArgsConstructor -@NoArgsConstructor -public class TsKvCompositeKey implements Serializable { - - @Transient - private static final long serialVersionUID = -4089175869616037523L; - - private UUID entityId; - private int key; - private long ts; - -} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvEntity.java deleted file mode 100644 index a38dc185a8..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/hsql/TsKvEntity.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.model.sqlts.hsql; - -import lombok.Data; -import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.dao.model.ToData; -import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; - -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.IdClass; -import javax.persistence.Table; - -import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; - -@Data -@Entity -@Table(name = "ts_kv") -@IdClass(TsKvCompositeKey.class) -public final class TsKvEntity extends AbstractTsKvEntity implements ToData { - - @Id - @Column(name = KEY_COLUMN) - private int key; - - public TsKvEntity() { - } - - public TsKvEntity(String strValue) { - this.strValue = strValue; - } - - public TsKvEntity(Long longValue, Double doubleValue, Long longCountValue, Long doubleCountValue, String aggType) { - if (!isAllNull(longValue, doubleValue, longCountValue, doubleCountValue)) { - switch (aggType) { - case AVG: - double sum = 0.0; - if (longValue != null) { - sum += longValue; - } - if (doubleValue != null) { - sum += doubleValue; - } - long totalCount = longCountValue + doubleCountValue; - if (totalCount > 0) { - this.doubleValue = sum / (longCountValue + doubleCountValue); - } else { - this.doubleValue = 0.0; - } - break; - case SUM: - if (doubleCountValue > 0) { - this.doubleValue = doubleValue + (longValue != null ? longValue.doubleValue() : 0.0); - } else { - this.longValue = longValue; - } - break; - case MIN: - case MAX: - if (longCountValue > 0 && doubleCountValue > 0) { - this.doubleValue = MAX.equals(aggType) ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue()); - } else if (doubleCountValue > 0) { - this.doubleValue = doubleValue; - } else if (longCountValue > 0) { - this.longValue = longValue; - } - break; - } - } - } - - public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount, Long jsonValueCount) { - if (!isAllNull(booleanValueCount, strValueCount, longValueCount, doubleValueCount)) { - if (booleanValueCount != 0) { - this.longValue = booleanValueCount; - } else if (strValueCount != 0) { - this.longValue = strValueCount; - } else if (jsonValueCount != 0) { - this.longValue = jsonValueCount; - } else { - this.longValue = longValueCount + doubleValueCount; - } - } - } - - @Override - public boolean isNotEmpty() { - return strValue != null || longValue != null || doubleValue != null || booleanValue != null; - } -} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java similarity index 94% rename from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvCompositeKey.java rename to dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java index 8209b4a77f..e7db0572ec 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvCompositeKey.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.model.sqlts.timescale; +package org.thingsboard.server.dao.model.sqlts.timescale.ts; import lombok.AllArgsConstructor; import lombok.Data; @@ -35,4 +35,4 @@ public class TimescaleTsKvCompositeKey implements Serializable { private UUID entityId; private int key; private long ts; -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java similarity index 99% rename from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java rename to dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java index 3bedae1563..76a95667a9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.model.sqlts.timescale; +package org.thingsboard.server.dao.model.sqlts.timescale.ts; import lombok.Data; import lombok.EqualsAndHashCode; @@ -191,4 +191,4 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD public boolean isNotEmpty() { return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvCompositeKey.java similarity index 95% rename from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvCompositeKey.java rename to dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvCompositeKey.java index f487b11414..ffc2076ecd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvCompositeKey.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvCompositeKey.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.model.sqlts.psql; +package org.thingsboard.server.dao.model.sqlts.ts; import lombok.AllArgsConstructor; import lombok.Data; @@ -34,4 +34,4 @@ public class TsKvCompositeKey implements Serializable { private UUID entityId; private int key; private long ts; -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java similarity index 98% rename from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvEntity.java rename to dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java index b10c5445ca..6d01b62d25 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.model.sqlts.psql; +package org.thingsboard.server.dao.model.sqlts.ts; import lombok.Data; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index cacf7aea93..3841d4f9c1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -15,32 +15,45 @@ */ package org.thingsboard.server.dao.sqlts; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; +import org.thingsboard.server.dao.DaoUtil; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; +import org.thingsboard.server.dao.sqlts.ts.TsKvRepository; +import org.thingsboard.server.dao.timeseries.TimeseriesDao; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @Slf4j -public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSqlTimeseriesDao { +public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSqlTimeseriesDao implements TimeseriesDao { @Autowired - protected InsertTsRepository insertRepository; + protected TsKvRepository tsKvRepository; - protected TbSqlBlockingQueue> tsQueue; + @Autowired + protected InsertTsRepository insertRepository; + + protected TbSqlBlockingQueue> tsQueue; @PostConstruct protected void init() { @@ -63,9 +76,102 @@ public abstract class AbstractChunkedAggregationTimeseriesDao> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation); + @Override + public ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + return service.submit(() -> { + tsKvRepository.delete( + entityId.getId(), + getOrSaveKeyId(query.getKey()), + query.getStartTs(), + query.getEndTs()); + return null; + }); + } - protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List> entitiesFutures) { + @Override + public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { + return getSaveLatestFuture(entityId, tsKvEntry); + } + + @Override + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + return getRemoveLatestFuture(tenantId, entityId, query); + } + + @Override + public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { + return getFindLatestFuture(entityId, key); + } + + @Override + public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { + return getFindAllLatestFuture(entityId); + } + + @Override + public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { + return Futures.immediateFuture(null); + } + + @Override + public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + return Futures.immediateFuture(null); + } + + @Override + public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { + return processFindAllAsync(tenantId, entityId, queries); + } + + @Override + protected ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + if (query.getAggregation() == Aggregation.NONE) { + return findAllAsyncWithLimit(tenantId, entityId, query); + } else { + long stepTs = query.getStartTs(); + List>> futures = new ArrayList<>(); + while (stepTs < query.getEndTs()) { + long startTs = stepTs; + long endTs = stepTs + query.getInterval(); + long ts = startTs + (endTs - startTs) / 2; + futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); + stepTs = endTs; + } + return getTskvEntriesFuture(Futures.allAsList(futures)); + } + } + + @Override + protected ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + Integer keyId = getOrSaveKeyId(query.getKey()); + List tsKvEntities = tsKvRepository.findAllWithLimit( + entityId.getId(), + keyId, + query.getStartTs(), + query.getEndTs(), + new PageRequest(0, query.getLimit(), + new Sort(Sort.Direction.fromString( + query.getOrderBy()), "ts"))); + tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey())); + return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities)); + } + + protected ListenableFuture> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { + List> entitiesFutures = new ArrayList<>(); + switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures); + return Futures.transform(setFutures(entitiesFutures), entity -> { + if (entity != null && entity.isNotEmpty()) { + entity.setEntityId(entityId.getId()); + entity.setStrKey(key); + entity.setTs(ts); + return Optional.of(DaoUtil.getData(entity)); + } else { + return Optional.empty(); + } + }); + } + + protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List> entitiesFutures) { switch (aggregation) { case AVG: findAvg(tenantId, entityId, key, startTs, endTs, entitiesFutures); @@ -87,19 +193,64 @@ public abstract class AbstractChunkedAggregationTimeseriesDao> entitiesFutures); + protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + Integer keyId = getOrSaveKeyId(key); + entitiesFutures.add(tsKvRepository.findCount( + entityId.getId(), + keyId, + startTs, + endTs)); + } - protected abstract void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures); + protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + Integer keyId = getOrSaveKeyId(key); + entitiesFutures.add(tsKvRepository.findSum( + entityId.getId(), + keyId, + startTs, + endTs)); + } - protected abstract void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures); + protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + Integer keyId = getOrSaveKeyId(key); + entitiesFutures.add(tsKvRepository.findStringMin( + entityId.getId(), + keyId, + startTs, + endTs)); + entitiesFutures.add(tsKvRepository.findNumericMin( + entityId.getId(), + keyId, + startTs, + endTs)); + } - protected abstract void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures); + protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + Integer keyId = getOrSaveKeyId(key); + entitiesFutures.add(tsKvRepository.findStringMax( + entityId.getId(), + keyId, + startTs, + endTs)); + entitiesFutures.add(tsKvRepository.findNumericMax( + entityId.getId(), + keyId, + startTs, + endTs)); + } - protected abstract void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures); + protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { + Integer keyId = getOrSaveKeyId(key); + entitiesFutures.add(tsKvRepository.findAvg( + entityId.getId(), + keyId, + startTs, + endTs)); + } - protected SettableFuture setFutures(List> entitiesFutures) { - SettableFuture listenableFuture = SettableFuture.create(); - CompletableFuture> entities = + protected SettableFuture setFutures(List> entitiesFutures) { + SettableFuture listenableFuture = SettableFuture.create(); + CompletableFuture> entities = CompletableFuture.allOf(entitiesFutures.toArray(new CompletableFuture[entitiesFutures.size()])) .thenApply(v -> entitiesFutures.stream() .map(CompletableFuture::join) @@ -109,8 +260,8 @@ public abstract class AbstractChunkedAggregationTimeseriesDao implements TimeseriesDao { - - @Autowired - private TsKvHsqlRepository tsKvRepository; - - @Override - public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { - return processFindAllAsync(tenantId, entityId, queries); - } +public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao implements TimeseriesDao { @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { @@ -72,154 +51,4 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa return tsQueue.add(new EntityContainer(entity, null)); } - @Override - public ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return service.submit(() -> { - tsKvRepository.delete( - entityId.getId(), - getOrSaveKeyId(query.getKey()), - query.getStartTs(), - query.getEndTs()); - return null; - }); - } - - @Override - public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { - return getSaveLatestFuture(entityId, tsKvEntry); - } - - @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return getRemoveLatestFuture(tenantId, entityId, query); - } - - @Override - public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return getFindLatestFuture(entityId, key); - } - - @Override - public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { - return getFindAllLatestFuture(entityId); - } - - @Override - public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { - return Futures.immediateFuture(null); - } - - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return Futures.immediateFuture(null); - } - - protected ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - if (query.getAggregation() == Aggregation.NONE) { - return findAllAsyncWithLimit(tenantId, entityId, query); - } else { - long stepTs = query.getStartTs(); - List>> futures = new ArrayList<>(); - while (stepTs < query.getEndTs()) { - long startTs = stepTs; - long endTs = stepTs + query.getInterval(); - long ts = startTs + (endTs - startTs) / 2; - futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); - stepTs = endTs; - } - return getTskvEntriesFuture(Futures.allAsList(futures)); - } - } - - @Override - protected ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - List tsKvEntities = tsKvRepository.findAllWithLimit( - entityId.getId(), - getOrSaveKeyId(query.getKey()), - query.getStartTs(), - query.getEndTs(), - new PageRequest(0, query.getLimit(), - new Sort(Sort.Direction.fromString( - query.getOrderBy()), "ts"))); - tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey())); - return Futures.immediateFuture( - DaoUtil.convertDataList( - tsKvEntities)); - } - - @Override - protected ListenableFuture> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { - List> entitiesFutures = new ArrayList<>(); - switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures); - return Futures.transform(setFutures(entitiesFutures), entity -> { - if (entity != null && entity.isNotEmpty()) { - entity.setEntityId(entityId.getId()); - entity.setKey(getOrSaveKeyId(key)); - entity.setTs(ts); - return Optional.of(DaoUtil.getData(entity)); - } else { - return Optional.empty(); - } - }); - } - - @Override - protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findCount( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findSum( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findStringMin( - entityId.getId(), - keyId, - startTs, - endTs)); - entitiesFutures.add(tsKvRepository.findNumericMin( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findStringMax( - entityId.getId(), - keyId, - startTs, - endTs)); - entitiesFutures.add(tsKvRepository.findNumericMax( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findAvg( - entityId.getId(), - keyId, - startTs, - endTs)); - } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/TsKvHsqlRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/TsKvHsqlRepository.java deleted file mode 100644 index a7c0effb97..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/TsKvHsqlRepository.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.sqlts.hsql; - -import org.springframework.data.domain.Pageable; -import org.springframework.data.jpa.repository.Modifying; -import org.springframework.data.jpa.repository.Query; -import org.springframework.data.repository.CrudRepository; -import org.springframework.data.repository.query.Param; -import org.springframework.scheduling.annotation.Async; -import org.springframework.transaction.annotation.Transactional; -import org.thingsboard.server.dao.model.sqlts.hsql.TsKvCompositeKey; -import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity; -import org.thingsboard.server.dao.util.SqlDao; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; - -@SqlDao -public interface TsKvHsqlRepository extends CrudRepository { - - @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - List findAllWithLimit(@Param("entityId") UUID entityId, - @Param("entityKey") int key, - @Param("startTs") long startTs, - @Param("endTs") long endTs, - Pageable pageable); - - @Transactional - @Modifying - @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - void delete(@Param("entityId") UUID entityId, - @Param("entityKey") int key, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - - @Async - @Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " + - "WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId AND tskv.key = :entityKey" + - " AND tskv.ts > :startTs AND tskv.ts <= :endTs") - CompletableFuture findStringMax(@Param("entityId") UUID entityId, - @Param("entityKey") int entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - - @Async - @Query("SELECT new TsKvEntity(MAX(COALESCE(tskv.longValue, -9223372036854775807)), " + - "MAX(COALESCE(tskv.doubleValue, -1.79769E+308)), " + - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + - "'MAX') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - CompletableFuture findNumericMax(@Param("entityId") UUID entityId, - @Param("entityKey") int entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - - - @Async - @Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " + - "WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId " + - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - CompletableFuture findStringMin(@Param("entityId") UUID entityId, - @Param("entityKey") int entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - - @Async - @Query("SELECT new TsKvEntity(MIN(COALESCE(tskv.longValue, 9223372036854775807)), " + - "MIN(COALESCE(tskv.doubleValue, 1.79769E+308)), " + - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + - "'MIN') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - CompletableFuture findNumericMin(@Param("entityId") UUID entityId, - @Param("entityKey") int entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - - @Async - @Query("SELECT new TsKvEntity(SUM(CASE WHEN tskv.booleanValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.jsonValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " + - "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - CompletableFuture findCount(@Param("entityId") UUID entityId, - @Param("entityKey") int entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - - @Async - @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " + - "SUM(COALESCE(tskv.doubleValue, 0.0)), " + - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + - "'AVG') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - CompletableFuture findAvg(@Param("entityId") UUID entityId, - @Param("entityKey") int entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - - @Async - @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " + - "SUM(COALESCE(tskv.doubleValue, 0.0)), " + - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + - "'SUM') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") - CompletableFuture findSum(@Param("entityId") UUID entityId, - @Param("entityKey") int entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); - -} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/AbstractInsertRepository.java similarity index 96% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/AbstractInsertRepository.java index 7c40ad8421..7116df9fb3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/AbstractInsertRepository.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts; +package org.thingsboard.server.dao.sqlts.insert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -44,4 +44,4 @@ public abstract class AbstractInsertRepository { } return strValue; } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java similarity index 88% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertTsRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java index 6ab11618f0..a2c066322a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java @@ -13,9 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts; +package org.thingsboard.server.dao.sqlts.insert; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; +import org.thingsboard.server.dao.sqlts.EntityContainer; import java.util.List; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/HsqlInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java similarity index 93% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/HsqlInsertTsRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java index d1e5294309..189758a947 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/HsqlInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java @@ -13,15 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts.hsql; +package org.thingsboard.server.dao.sqlts.insert.hsql; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; -import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity; -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sqlts.EntityContainer; -import org.thingsboard.server.dao.sqlts.InsertTsRepository; +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.util.HsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; @@ -86,4 +86,4 @@ public class HsqlInsertTsRepository extends AbstractInsertRepository implements } }); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertLatestTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java similarity index 93% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertLatestTsRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java index c7b0f68b7e..539ce2d6f7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertLatestTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts; +package org.thingsboard.server.dao.sqlts.insert.latest; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/HsqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/hsql/HsqlLatestInsertTsRepository.java similarity index 94% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/HsqlLatestInsertTsRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/hsql/HsqlLatestInsertTsRepository.java index 65ac6257f0..224dc52805 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/HsqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/hsql/HsqlLatestInsertTsRepository.java @@ -13,14 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts.latest; +package org.thingsboard.server.dao.sqlts.insert.latest.hsql; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; -import org.thingsboard.server.dao.sqlts.InsertLatestTsRepository; +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; +import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.util.HsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; @@ -82,4 +82,4 @@ public class HsqlLatestInsertTsRepository extends AbstractInsertRepository imple } }); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/PsqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/psql/PsqlLatestInsertTsRepository.java similarity index 96% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/PsqlLatestInsertTsRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/psql/PsqlLatestInsertTsRepository.java index d367f44620..41abae52f8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/PsqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/psql/PsqlLatestInsertTsRepository.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts.latest; +package org.thingsboard.server.dao.sqlts.insert.latest.psql; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; @@ -21,8 +21,8 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; -import org.thingsboard.server.dao.sqlts.InsertLatestTsRepository; +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; +import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.util.PsqlTsAnyDao; import java.sql.PreparedStatement; @@ -151,4 +151,4 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple } }); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java similarity index 94% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlInsertTsRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java index 00be466027..d4a5dd25b0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java @@ -13,15 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts.psql; +package org.thingsboard.server.dao.sqlts.insert.psql; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; -import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity; -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; import org.thingsboard.server.dao.sqlts.EntityContainer; -import org.thingsboard.server.dao.sqlts.InsertTsRepository; +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; @@ -101,4 +101,4 @@ public class PsqlInsertTsRepository extends AbstractInsertRepository implements private String getInsertOrUpdateQuery(String partitionDate) { return INSERT_INTO_TS_KV + partitionDate + VALUES_ON_CONFLICT_DO_UPDATE; } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlPartitioningRepository.java similarity index 95% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlPartitioningRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlPartitioningRepository.java index 0e22cb26ba..a3def06a4d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlPartitioningRepository.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts.psql; +package org.thingsboard.server.dao.sqlts.insert.psql; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; @@ -38,4 +38,4 @@ public class PsqlPartitioningRepository { .executeUpdate(); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java similarity index 92% rename from dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertTsRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java index 6d863af105..4cd0a4ab59 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java @@ -13,15 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sqlts.timescale; +package org.thingsboard.server.dao.sqlts.insert.timescale; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; import org.thingsboard.server.dao.sqlts.EntityContainer; -import org.thingsboard.server.dao.sqlts.InsertTsRepository; +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.TimescaleDBTsDao; @@ -89,4 +89,4 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem } }); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java index 6d60e843ec..29cdd64918 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java @@ -15,27 +15,20 @@ */ package org.thingsboard.server.dao.sqlts.psql; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.Aggregation; -import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; -import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.dao.DaoUtil; -import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; import org.thingsboard.server.dao.sqlts.EntityContainer; +import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository; import org.thingsboard.server.dao.timeseries.PsqlPartition; import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate; -import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; @@ -44,11 +37,8 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -59,14 +49,11 @@ import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_STA @Slf4j @SqlTsDao @PsqlDao -public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao implements TimeseriesDao { +public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao { private final Map partitions = new ConcurrentHashMap<>(); private static final ReentrantLock partitionCreationLock = new ReentrantLock(); - @Autowired - private TsKvPsqlRepository tsKvRepository; - @Autowired private PsqlPartitioningRepository partitioningRepository; @@ -110,163 +97,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate())); } - @Override - public ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return service.submit(() -> { - String strKey = query.getKey(); - Integer keyId = getOrSaveKeyId(strKey); - tsKvRepository.delete( - entityId.getId(), - keyId, - query.getStartTs(), - query.getEndTs()); - return null; - }); - } - - @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return getRemoveLatestFuture(tenantId, entityId, query); - } - - @Override - public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { - return getSaveLatestFuture(entityId, tsKvEntry); - } - - @Override - public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return getFindLatestFuture(entityId, key); - } - - @Override - public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { - return getFindAllLatestFuture(entityId); - } - - @Override - public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { - return Futures.immediateFuture(null); - } - - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return Futures.immediateFuture(null); - } - - protected ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - if (query.getAggregation() == Aggregation.NONE) { - return findAllAsyncWithLimit(tenantId, entityId, query); - } else { - long stepTs = query.getStartTs(); - List>> futures = new ArrayList<>(); - while (stepTs < query.getEndTs()) { - long startTs = stepTs; - long endTs = stepTs + query.getInterval(); - long ts = startTs + (endTs - startTs) / 2; - futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); - stepTs = endTs; - } - return getTskvEntriesFuture(Futures.allAsList(futures)); - } - } - - @Override - protected ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - Integer keyId = getOrSaveKeyId(query.getKey()); - List tsKvEntities = tsKvRepository.findAllWithLimit( - entityId.getId(), - keyId, - query.getStartTs(), - query.getEndTs(), - new PageRequest(0, query.getLimit(), - new Sort(Sort.Direction.fromString( - query.getOrderBy()), "ts"))); - tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey())); - return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities)); - } - - @Override - protected ListenableFuture> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { - List> entitiesFutures = new ArrayList<>(); - switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures); - return Futures.transform(setFutures(entitiesFutures), entity -> { - if (entity != null && entity.isNotEmpty()) { - entity.setEntityId(entityId.getId()); - entity.setStrKey(key); - entity.setTs(ts); - return Optional.of(DaoUtil.getData(entity)); - } else { - return Optional.empty(); - } - }); - } - - @Override - protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findCount( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findSum( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findStringMin( - entityId.getId(), - keyId, - startTs, - endTs)); - entitiesFutures.add(tsKvRepository.findNumericMin( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findStringMax( - entityId.getId(), - keyId, - startTs, - endTs)); - entitiesFutures.add(tsKvRepository.findNumericMax( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List> entitiesFutures) { - Integer keyId = getOrSaveKeyId(key); - entitiesFutures.add(tsKvRepository.findAvg( - entityId.getId(), - keyId, - startTs, - endTs)); - } - - @Override - public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { - return processFindAllAsync(tenantId, entityId, queries); - } - private void savePartition(PsqlPartition psqlPartition) { if (!partitions.containsKey(psqlPartition.getStart())) { partitionCreationLock.lock(); @@ -306,4 +136,4 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa private static long toMills(LocalDateTime time) { return time.toInstant(ZoneOffset.UTC).toEpochMilli(); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java index 5a0b9c6a59..bb0b13f2a6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/AggregationRepository.java @@ -17,7 +17,7 @@ package org.thingsboard.server.dao.sqlts.timescale; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Repository; -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.util.TimescaleDBTsDao; import javax.persistence.EntityManager; @@ -98,4 +98,4 @@ public class AggregationRepository { } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 9f8f5c6f74..176bc712e8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -31,12 +31,12 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.DaoUtil; -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; import org.thingsboard.server.dao.sqlts.EntityContainer; -import org.thingsboard.server.dao.sqlts.InsertTsRepository; +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.TimescaleDBTsDao; @@ -286,4 +286,4 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements startTs, endTs); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java index a4b15abd26..fb9cb6f7fe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TsKvTimescaleRepository.java @@ -21,8 +21,8 @@ import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.query.Param; import org.springframework.transaction.annotation.Transactional; -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvCompositeKey; -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvCompositeKey; +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.util.TimescaleDBTsDao; import java.util.List; @@ -54,4 +54,4 @@ public interface TsKvTimescaleRepository extends CrudRepository { +public interface TsKvRepository extends CrudRepository { @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")