deleted removePartitions from TimeseriesDao
This commit is contained in:
parent
8a27614936
commit
7b427d1fe6
@ -105,11 +105,6 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
|
|||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<ReadTsKvQueryResult>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
|
public ListenableFuture<List<ReadTsKvQueryResult>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
|
||||||
return processFindAllAsync(tenantId, entityId, queries);
|
return processFindAllAsync(tenantId, entityId, queries);
|
||||||
|
|||||||
@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sqlts.timescale;
|
|||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.domain.PageRequest;
|
import org.springframework.data.domain.PageRequest;
|
||||||
@ -36,7 +35,6 @@ import org.thingsboard.server.common.stats.StatsFactory;
|
|||||||
import org.thingsboard.server.dao.DaoUtil;
|
import org.thingsboard.server.dao.DaoUtil;
|
||||||
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
|
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
|
||||||
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
|
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
|
||||||
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
|
|
||||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
|
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
|
||||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
||||||
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
|
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
|
||||||
@ -52,7 +50,6 @@ import java.util.Comparator;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@ -144,11 +141,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
|
||||||
return service.submit(() -> null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
||||||
if (query.getAggregation() == Aggregation.NONE) {
|
if (query.getAggregation() == Aggregation.NONE) {
|
||||||
|
|||||||
@ -46,7 +46,6 @@ import org.thingsboard.server.dao.service.Validator;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ -277,7 +276,6 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvLatestRemovingResult>> futures, EntityId entityId, DeleteTsKvQuery query) {
|
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvLatestRemovingResult>> futures, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
|
futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
|
||||||
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
|
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
|
||||||
futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void validate(EntityId entityId) {
|
private static void validate(EntityId entityId) {
|
||||||
|
|||||||
@ -226,46 +226,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
|
|||||||
return resultFuture;
|
return resultFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
|
||||||
long minPartition = toPartitionTs(query.getStartTs());
|
|
||||||
long maxPartition = toPartitionTs(query.getEndTs());
|
|
||||||
if (minPartition == maxPartition) {
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
} else {
|
|
||||||
TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition);
|
|
||||||
|
|
||||||
final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
|
|
||||||
final ListenableFuture<List<Long>> partitionsListFuture = Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
|
|
||||||
|
|
||||||
Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(@Nullable List<Long> partitions) {
|
|
||||||
int index = 0;
|
|
||||||
if (minPartition != query.getStartTs()) {
|
|
||||||
index = 1;
|
|
||||||
}
|
|
||||||
List<Long> partitionsToDelete = new ArrayList<>();
|
|
||||||
for (int i = index; i < partitions.size() - 1; i++) {
|
|
||||||
partitionsToDelete.add(partitions.get(i));
|
|
||||||
}
|
|
||||||
QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
|
|
||||||
deletePartitionAsync(tenantId, cursor, resultFuture);
|
|
||||||
|
|
||||||
for (Long partition : partitionsToDelete) {
|
|
||||||
cassandraTsPartitionsCache.invalidate(new CassandraPartitionCacheKey(entityId, query.getKey(), partition));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable t) {
|
|
||||||
log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
|
|
||||||
}
|
|
||||||
}, readResultsProcessingExecutor);
|
|
||||||
return resultFuture;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
|
||||||
if (query.getAggregation() == Aggregation.NONE) {
|
if (query.getAggregation() == Aggregation.NONE) {
|
||||||
|
|||||||
@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
|||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
@ -39,7 +38,5 @@ public interface TimeseriesDao {
|
|||||||
|
|
||||||
ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
|
ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
|
||||||
|
|
||||||
ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
|
|
||||||
|
|
||||||
void cleanup(long systemTtl);
|
void cleanup(long systemTtl);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user