SQL DAO Refactoring

This commit is contained in:
Igor Kulikov 2020-02-24 18:24:40 +02:00
parent 4b0aae896d
commit 7a2b76b8c0
27 changed files with 235 additions and 699 deletions

View File

@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao;
@Configuration
@EnableAutoConfiguration
@ComponentScan({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest"})
@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})
@EntityScan({"org.thingsboard.server.dao.model.sqlts.hsql", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"})
@ComponentScan({"org.thingsboard.server.dao.sqlts.hsql"})
@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.ts", "org.thingsboard.server.dao.sqlts.insert.hsql", "org.thingsboard.server.dao.sqlts.insert.latest.hsql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})
@EntityScan({"org.thingsboard.server.dao.model.sqlts.ts", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"})
@EnableTransactionManagement
@SqlTsDao
@HsqlDao
public class HsqlTsDaoConfig {
}
}

View File

@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao;
@Configuration
@EnableAutoConfiguration
@ComponentScan({"org.thingsboard.server.dao.sqlts.psql", "org.thingsboard.server.dao.sqlts.latest"})
@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.psql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})
@EntityScan({"org.thingsboard.server.dao.model.sqlts.psql", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"})
@ComponentScan({"org.thingsboard.server.dao.sqlts.psql"})
@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.ts", "org.thingsboard.server.dao.sqlts.insert.psql", "org.thingsboard.server.dao.sqlts.insert.latest.psql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})
@EntityScan({"org.thingsboard.server.dao.model.sqlts.ts", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"})
@EnableTransactionManagement
@SqlTsDao
@PsqlDao
public class PsqlTsDaoConfig {
}
}

View File

@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao;
@Configuration
@EnableAutoConfiguration
@ComponentScan({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.latest"})
@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.dictionary", "org.thingsboard.server.dao.sqlts.latest"})
@ComponentScan({"org.thingsboard.server.dao.sqlts.timescale"})
@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.insert.latest.psql", "org.thingsboard.server.dao.sqlts.insert.timescale", "org.thingsboard.server.dao.sqlts.dictionary", "org.thingsboard.server.dao.sqlts.latest"})
@EntityScan({"org.thingsboard.server.dao.model.sqlts.timescale", "org.thingsboard.server.dao.model.sqlts.dictionary", "org.thingsboard.server.dao.model.sqlts.latest"})
@EnableTransactionManagement
@TimescaleDBTsDao
@PsqlDao
public class TimescaleDaoConfig {
}
}

View File

@ -1,39 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sqlts.hsql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.EntityType;
import javax.persistence.Transient;
import java.io.Serializable;
import java.util.UUID;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TsKvCompositeKey implements Serializable {
@Transient
private static final long serialVersionUID = -4089175869616037523L;
private UUID entityId;
private int key;
private long ts;
}

View File

@ -1,105 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sqlts.hsql;
import lombok.Data;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Table;
import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
@Data
@Entity
@Table(name = "ts_kv")
@IdClass(TsKvCompositeKey.class)
public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
@Id
@Column(name = KEY_COLUMN)
private int key;
public TsKvEntity() {
}
public TsKvEntity(String strValue) {
this.strValue = strValue;
}
public TsKvEntity(Long longValue, Double doubleValue, Long longCountValue, Long doubleCountValue, String aggType) {
if (!isAllNull(longValue, doubleValue, longCountValue, doubleCountValue)) {
switch (aggType) {
case AVG:
double sum = 0.0;
if (longValue != null) {
sum += longValue;
}
if (doubleValue != null) {
sum += doubleValue;
}
long totalCount = longCountValue + doubleCountValue;
if (totalCount > 0) {
this.doubleValue = sum / (longCountValue + doubleCountValue);
} else {
this.doubleValue = 0.0;
}
break;
case SUM:
if (doubleCountValue > 0) {
this.doubleValue = doubleValue + (longValue != null ? longValue.doubleValue() : 0.0);
} else {
this.longValue = longValue;
}
break;
case MIN:
case MAX:
if (longCountValue > 0 && doubleCountValue > 0) {
this.doubleValue = MAX.equals(aggType) ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue());
} else if (doubleCountValue > 0) {
this.doubleValue = doubleValue;
} else if (longCountValue > 0) {
this.longValue = longValue;
}
break;
}
}
}
public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount, Long jsonValueCount) {
if (!isAllNull(booleanValueCount, strValueCount, longValueCount, doubleValueCount)) {
if (booleanValueCount != 0) {
this.longValue = booleanValueCount;
} else if (strValueCount != 0) {
this.longValue = strValueCount;
} else if (jsonValueCount != 0) {
this.longValue = jsonValueCount;
} else {
this.longValue = longValueCount + doubleValueCount;
}
}
}
@Override
public boolean isNotEmpty() {
return strValue != null || longValue != null || doubleValue != null || booleanValue != null;
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sqlts.timescale;
package org.thingsboard.server.dao.model.sqlts.timescale.ts;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -35,4 +35,4 @@ public class TimescaleTsKvCompositeKey implements Serializable {
private UUID entityId;
private int key;
private long ts;
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sqlts.timescale;
package org.thingsboard.server.dao.model.sqlts.timescale.ts;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -191,4 +191,4 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD
public boolean isNotEmpty() {
return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null);
}
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sqlts.psql;
package org.thingsboard.server.dao.model.sqlts.ts;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -34,4 +34,4 @@ public class TsKvCompositeKey implements Serializable {
private UUID entityId;
private int key;
private long ts;
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sqlts.psql;
package org.thingsboard.server.dao.model.sqlts.ts;
import lombok.Data;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;

View File

@ -15,32 +15,45 @@
*/
package org.thingsboard.server.dao.sqlts;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
import org.thingsboard.server.dao.sqlts.ts.TsKvRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@Slf4j
public abstract class AbstractChunkedAggregationTimeseriesDao<T extends AbstractTsKvEntity> extends AbstractSqlTimeseriesDao {
public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSqlTimeseriesDao implements TimeseriesDao {
@Autowired
protected InsertTsRepository<T> insertRepository;
protected TsKvRepository tsKvRepository;
protected TbSqlBlockingQueue<EntityContainer<T>> tsQueue;
@Autowired
protected InsertTsRepository<TsKvEntity> insertRepository;
protected TbSqlBlockingQueue<EntityContainer<TsKvEntity>> tsQueue;
@PostConstruct
protected void init() {
@ -63,9 +76,102 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract
}
}
protected abstract ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation);
@Override
public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return service.submit(() -> {
tsKvRepository.delete(
entityId.getId(),
getOrSaveKeyId(query.getKey()),
query.getStartTs(),
query.getEndTs());
return null;
});
}
protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<T>> entitiesFutures) {
@Override
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
return getSaveLatestFuture(entityId, tsKvEntry);
}
@Override
public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return getRemoveLatestFuture(tenantId, entityId, query);
}
@Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return getFindLatestFuture(entityId, key);
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
return getFindAllLatestFuture(entityId);
}
@Override
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
return processFindAllAsync(tenantId, entityId, queries);
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
long stepTs = query.getStartTs();
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + query.getInterval();
long ts = startTs + (endTs - startTs) / 2;
futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
stepTs = endTs;
}
return getTskvEntriesFuture(Futures.allAsList(futures));
}
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
Integer keyId = getOrSaveKeyId(query.getKey());
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
entityId.getId(),
keyId,
query.getStartTs(),
query.getEndTs(),
new PageRequest(0, query.getLimit(),
new Sort(Sort.Direction.fromString(
query.getOrderBy()), "ts")));
tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));
}
protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);
return Futures.transform(setFutures(entitiesFutures), entity -> {
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityId.getId());
entity.setStrKey(key);
entity.setTs(ts);
return Optional.of(DaoUtil.getData(entity));
} else {
return Optional.empty();
}
});
}
protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
switch (aggregation) {
case AVG:
findAvg(tenantId, entityId, key, startTs, endTs, entitiesFutures);
@ -87,19 +193,64 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract
}
}
protected abstract void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findCount(
entityId.getId(),
keyId,
startTs,
endTs));
}
protected abstract void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findSum(
entityId.getId(),
keyId,
startTs,
endTs));
}
protected abstract void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMin(
entityId.getId(),
keyId,
startTs,
endTs));
entitiesFutures.add(tsKvRepository.findNumericMin(
entityId.getId(),
keyId,
startTs,
endTs));
}
protected abstract void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMax(
entityId.getId(),
keyId,
startTs,
endTs));
entitiesFutures.add(tsKvRepository.findNumericMax(
entityId.getId(),
keyId,
startTs,
endTs));
}
protected abstract void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findAvg(
entityId.getId(),
keyId,
startTs,
endTs));
}
protected SettableFuture<T> setFutures(List<CompletableFuture<T>> entitiesFutures) {
SettableFuture<T> listenableFuture = SettableFuture.create();
CompletableFuture<List<T>> entities =
protected SettableFuture<TsKvEntity> setFutures(List<CompletableFuture<TsKvEntity>> entitiesFutures) {
SettableFuture<TsKvEntity> listenableFuture = SettableFuture.create();
CompletableFuture<List<TsKvEntity>> entities =
CompletableFuture.allOf(entitiesFutures.toArray(new CompletableFuture[entitiesFutures.size()]))
.thenApply(v -> entitiesFutures.stream()
.map(CompletableFuture::join)
@ -109,8 +260,8 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract
if (throwable != null) {
listenableFuture.setException(throwable);
} else {
T result = null;
for (T entity : tsKvEntities) {
TsKvEntity result = null;
for (TsKvEntity entity : tsKvEntities) {
if (entity.isNotEmpty()) {
result = entity;
break;

View File

@ -43,6 +43,7 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;

View File

@ -15,46 +15,25 @@
*/
package org.thingsboard.server.dao.sqlts.hsql;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao;
import org.thingsboard.server.dao.sqlts.EntityContainer;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.HsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@Component
@Slf4j
@SqlTsDao
@HsqlDao
public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao<TsKvEntity> implements TimeseriesDao {
@Autowired
private TsKvHsqlRepository tsKvRepository;
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
return processFindAllAsync(tenantId, entityId, queries);
}
public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao implements TimeseriesDao {
@Override
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
@ -72,154 +51,4 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
return tsQueue.add(new EntityContainer(entity, null));
}
@Override
public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return service.submit(() -> {
tsKvRepository.delete(
entityId.getId(),
getOrSaveKeyId(query.getKey()),
query.getStartTs(),
query.getEndTs());
return null;
});
}
@Override
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
return getSaveLatestFuture(entityId, tsKvEntry);
}
@Override
public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return getRemoveLatestFuture(tenantId, entityId, query);
}
@Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return getFindLatestFuture(entityId, key);
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
return getFindAllLatestFuture(entityId);
}
@Override
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return Futures.immediateFuture(null);
}
protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
long stepTs = query.getStartTs();
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + query.getInterval();
long ts = startTs + (endTs - startTs) / 2;
futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
stepTs = endTs;
}
return getTskvEntriesFuture(Futures.allAsList(futures));
}
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
entityId.getId(),
getOrSaveKeyId(query.getKey()),
query.getStartTs(),
query.getEndTs(),
new PageRequest(0, query.getLimit(),
new Sort(Sort.Direction.fromString(
query.getOrderBy()), "ts")));
tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
return Futures.immediateFuture(
DaoUtil.convertDataList(
tsKvEntities));
}
@Override
protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);
return Futures.transform(setFutures(entitiesFutures), entity -> {
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityId.getId());
entity.setKey(getOrSaveKeyId(key));
entity.setTs(ts);
return Optional.of(DaoUtil.getData(entity));
} else {
return Optional.empty();
}
});
}
@Override
protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findCount(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findSum(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMin(
entityId.getId(),
keyId,
startTs,
endTs));
entitiesFutures.add(tsKvRepository.findNumericMin(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMax(
entityId.getId(),
keyId,
startTs,
endTs));
entitiesFutures.add(tsKvRepository.findNumericMax(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findAvg(
entityId.getId(),
keyId,
startTs,
endTs));
}
}
}

View File

@ -1,132 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.hsql;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.hsql.TsKvCompositeKey;
import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity;
import org.thingsboard.server.dao.util.SqlDao;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@SqlDao
public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
@Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId,
@Param("entityKey") int key,
@Param("startTs") long startTs,
@Param("endTs") long endTs,
Pageable pageable);
@Transactional
@Modifying
@Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
void delete(@Param("entityId") UUID entityId,
@Param("entityKey") int key,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " +
"WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId AND tskv.key = :entityKey" +
" AND tskv.ts > :startTs AND tskv.ts <= :endTs")
CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") UUID entityId,
@Param("entityKey") int entityKey,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(MAX(COALESCE(tskv.longValue, -9223372036854775807)), " +
"MAX(COALESCE(tskv.doubleValue, -1.79769E+308)), " +
"SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
"'MAX') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") UUID entityId,
@Param("entityKey") int entityKey,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " +
"WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") UUID entityId,
@Param("entityKey") int entityKey,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(MIN(COALESCE(tskv.longValue, 9223372036854775807)), " +
"MIN(COALESCE(tskv.doubleValue, 1.79769E+308)), " +
"SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
"'MIN') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
CompletableFuture<TsKvEntity> findNumericMin(@Param("entityId") UUID entityId,
@Param("entityKey") int entityKey,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(SUM(CASE WHEN tskv.booleanValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.jsonValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
CompletableFuture<TsKvEntity> findCount(@Param("entityId") UUID entityId,
@Param("entityKey") int entityKey,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +
"SUM(COALESCE(tskv.doubleValue, 0.0)), " +
"SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
"'AVG') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
CompletableFuture<TsKvEntity> findAvg(@Param("entityId") UUID entityId,
@Param("entityKey") int entityKey,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +
"SUM(COALESCE(tskv.doubleValue, 0.0)), " +
"SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
"SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
"'SUM') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
CompletableFuture<TsKvEntity> findSum(@Param("entityId") UUID entityId,
@Param("entityKey") int entityKey,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts;
package org.thingsboard.server.dao.sqlts.insert;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -44,4 +44,4 @@ public abstract class AbstractInsertRepository {
}
return strValue;
}
}
}

View File

@ -13,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts;
package org.thingsboard.server.dao.sqlts.insert;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import org.thingsboard.server.dao.sqlts.EntityContainer;
import java.util.List;

View File

@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.hsql;
package org.thingsboard.server.dao.sqlts.insert.hsql;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity;
import org.thingsboard.server.dao.sqlts.AbstractInsertRepository;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sqlts.EntityContainer;
import org.thingsboard.server.dao.sqlts.InsertTsRepository;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
import org.thingsboard.server.dao.util.HsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
@ -86,4 +86,4 @@ public class HsqlInsertTsRepository extends AbstractInsertRepository implements
}
});
}
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts;
package org.thingsboard.server.dao.sqlts.insert.latest;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;

View File

@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.latest;
package org.thingsboard.server.dao.sqlts.insert.latest.hsql;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.InsertLatestTsRepository;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.util.HsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
@ -82,4 +82,4 @@ public class HsqlLatestInsertTsRepository extends AbstractInsertRepository imple
}
});
}
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.latest;
package org.thingsboard.server.dao.sqlts.insert.latest.psql;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
@ -21,8 +21,8 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.InsertLatestTsRepository;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.util.PsqlTsAnyDao;
import java.sql.PreparedStatement;
@ -151,4 +151,4 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple
}
});
}
}
}

View File

@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.psql;
package org.thingsboard.server.dao.sqlts.insert.psql;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity;
import org.thingsboard.server.dao.sqlts.AbstractInsertRepository;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.EntityContainer;
import org.thingsboard.server.dao.sqlts.InsertTsRepository;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
@ -101,4 +101,4 @@ public class PsqlInsertTsRepository extends AbstractInsertRepository implements
private String getInsertOrUpdateQuery(String partitionDate) {
return INSERT_INTO_TS_KV + partitionDate + VALUES_ON_CONFLICT_DO_UPDATE;
}
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.psql;
package org.thingsboard.server.dao.sqlts.insert.psql;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@ -38,4 +38,4 @@ public class PsqlPartitioningRepository {
.executeUpdate();
}
}
}

View File

@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.timescale;
package org.thingsboard.server.dao.sqlts.insert.timescale;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;
import org.thingsboard.server.dao.sqlts.AbstractInsertRepository;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.EntityContainer;
import org.thingsboard.server.dao.sqlts.InsertTsRepository;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
@ -89,4 +89,4 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem
}
});
}
}
}

View File

@ -15,27 +15,20 @@
*/
package org.thingsboard.server.dao.sqlts.psql;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao;
import org.thingsboard.server.dao.sqlts.EntityContainer;
import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository;
import org.thingsboard.server.dao.timeseries.PsqlPartition;
import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
@ -44,11 +37,8 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@ -59,14 +49,11 @@ import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_STA
@Slf4j
@SqlTsDao
@PsqlDao
public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao<TsKvEntity> implements TimeseriesDao {
public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao {
private final Map<Long, PsqlPartition> partitions = new ConcurrentHashMap<>();
private static final ReentrantLock partitionCreationLock = new ReentrantLock();
@Autowired
private TsKvPsqlRepository tsKvRepository;
@Autowired
private PsqlPartitioningRepository partitioningRepository;
@ -110,163 +97,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate()));
}
@Override
public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return service.submit(() -> {
String strKey = query.getKey();
Integer keyId = getOrSaveKeyId(strKey);
tsKvRepository.delete(
entityId.getId(),
keyId,
query.getStartTs(),
query.getEndTs());
return null;
});
}
@Override
public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return getRemoveLatestFuture(tenantId, entityId, query);
}
@Override
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
return getSaveLatestFuture(entityId, tsKvEntry);
}
@Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return getFindLatestFuture(entityId, key);
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
return getFindAllLatestFuture(entityId);
}
@Override
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return Futures.immediateFuture(null);
}
protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
long stepTs = query.getStartTs();
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + query.getInterval();
long ts = startTs + (endTs - startTs) / 2;
futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
stepTs = endTs;
}
return getTskvEntriesFuture(Futures.allAsList(futures));
}
}
@Override
protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
Integer keyId = getOrSaveKeyId(query.getKey());
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
entityId.getId(),
keyId,
query.getStartTs(),
query.getEndTs(),
new PageRequest(0, query.getLimit(),
new Sort(Sort.Direction.fromString(
query.getOrderBy()), "ts")));
tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));
}
@Override
protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);
return Futures.transform(setFutures(entitiesFutures), entity -> {
if (entity != null && entity.isNotEmpty()) {
entity.setEntityId(entityId.getId());
entity.setStrKey(key);
entity.setTs(ts);
return Optional.of(DaoUtil.getData(entity));
} else {
return Optional.empty();
}
});
}
@Override
protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findCount(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findSum(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMin(
entityId.getId(),
keyId,
startTs,
endTs));
entitiesFutures.add(tsKvRepository.findNumericMin(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findStringMax(
entityId.getId(),
keyId,
startTs,
endTs));
entitiesFutures.add(tsKvRepository.findNumericMax(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
Integer keyId = getOrSaveKeyId(key);
entitiesFutures.add(tsKvRepository.findAvg(
entityId.getId(),
keyId,
startTs,
endTs));
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
return processFindAllAsync(tenantId, entityId, queries);
}
private void savePartition(PsqlPartition psqlPartition) {
if (!partitions.containsKey(psqlPartition.getStart())) {
partitionCreationLock.lock();
@ -306,4 +136,4 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
private static long toMills(LocalDateTime time) {
return time.toInstant(ZoneOffset.UTC).toEpochMilli();
}
}
}

View File

@ -17,7 +17,7 @@ package org.thingsboard.server.dao.sqlts.timescale;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import javax.persistence.EntityManager;
@ -98,4 +98,4 @@ public class AggregationRepository {
}
}
}

View File

@ -31,12 +31,12 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
import org.thingsboard.server.dao.sqlts.EntityContainer;
import org.thingsboard.server.dao.sqlts.InsertTsRepository;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
@ -286,4 +286,4 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
startTs,
endTs);
}
}
}

View File

@ -21,8 +21,8 @@ import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvCompositeKey;
import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvCompositeKey;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import java.util.List;
@ -54,4 +54,4 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt
@Param("startTs") long startTs,
@Param("endTs") long endTs);
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.psql;
package org.thingsboard.server.dao.sqlts.ts;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Modifying;
@ -22,8 +22,8 @@ import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.psql.TsKvCompositeKey;
import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvCompositeKey;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.util.SqlDao;
import java.util.List;
@ -31,7 +31,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@SqlDao
public interface TsKvPsqlRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
@Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")