From 7b427d1fe6ea8c0986d0b5a4857c1de5cba1332f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 16 Nov 2022 15:52:11 +0100 Subject: [PATCH] deleted removePartitions from TimeseriesDao --- ...stractChunkedAggregationTimeseriesDao.java | 5 --- .../timescale/TimescaleTimeseriesDao.java | 8 ---- .../dao/timeseries/BaseTimeseriesService.java | 2 - .../CassandraBaseTimeseriesDao.java | 40 ------------------- .../server/dao/timeseries/TimeseriesDao.java | 3 -- 5 files changed, 58 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index cf726c32c4..4875468c09 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -105,11 +105,6 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq return Futures.immediateFuture(null); } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return Futures.immediateFuture(null); - } - @Override public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { return processFindAllAsync(tenantId, entityId, queries); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index c9daf1ec6d..08089a818e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sqlts.timescale; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; @@ -36,7 +35,6 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; -import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; @@ -52,7 +50,6 @@ import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; @Component @@ -144,11 +141,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements }); } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return service.submit(() -> null); - } - @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index c8a5076c65..a2905a7a64 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -46,7 +46,6 @@ import org.thingsboard.server.dao.service.Validator; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -277,7 +276,6 @@ public class BaseTimeseriesService implements TimeseriesService { private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); - futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); } private static void validate(EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 651fb65150..ffc52787f9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -226,46 +226,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return resultFuture; } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - long minPartition = toPartitionTs(query.getStartTs()); - long maxPartition = toPartitionTs(query.getEndTs()); - if (minPartition == maxPartition) { - return Futures.immediateFuture(null); - } else { - TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); - - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); - final ListenableFuture> partitionsListFuture = Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); - - Futures.addCallback(partitionsListFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable List partitions) { - int index = 0; - if (minPartition != query.getStartTs()) { - index = 1; - } - List partitionsToDelete = new ArrayList<>(); - for (int i = index; i < partitions.size() - 1; i++) { - partitionsToDelete.add(partitions.get(i)); - } - QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete); - deletePartitionAsync(tenantId, cursor, resultFuture); - - for (Long partition : partitionsToDelete) { - cassandraTsPartitionsCache.invalidate(new CassandraPartitionCacheKey(entityId, query.getKey(), partition)); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); - } - }, readResultsProcessingExecutor); - return resultFuture; - } - } - @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 5fd26d400a..4878fdd293 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; -import java.util.Map; /** * @author Andrew Shvayka @@ -39,7 +38,5 @@ public interface TimeseriesDao { ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); - ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); - void cleanup(long systemTtl); }