From d5319c9de041c785db4dc787806726c92c9ba61c Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 30 Jul 2021 14:36:42 +0300 Subject: [PATCH] Partitions should not be removed by custom TTL --- .../dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java | 2 +- .../server/dao/sqlts/timescale/TimescaleTimeseriesDao.java | 2 +- .../server/dao/timeseries/BaseTimeseriesService.java | 2 +- .../server/dao/timeseries/CassandraBaseTimeseriesDao.java | 7 +++++-- .../thingsboard/server/dao/timeseries/TimeseriesDao.java | 2 +- .../server/dao/nosql/CassandraPartitionsCacheTest.java | 4 ++-- 6 files changed, 11 insertions(+), 8 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index bca52d3ad4..9c2dce109b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -100,7 +100,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq } @Override - public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { + public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key) { return Futures.immediateFuture(null); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 31f3407fbf..435c9c5b70 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -124,7 +124,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements } @Override - public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { + public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key) { return Futures.immediateFuture(0); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index fb15af723a..3170f6c256 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -170,7 +170,7 @@ public class BaseTimeseriesService implements TimeseriesService { if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); } - futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl)); + futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey())); futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor())); futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index ce653e2e6e..6e7191b50c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -181,11 +181,14 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } @Override - public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { + public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key) { if (isFixedPartitioning()) { return Futures.immediateFuture(null); } - ttl = computeTtl(ttl); + // DO NOT apply custom to partition, otherwise short TTL will remove partition too early + // partitions must remain in the DB forever or be removed only by systemTtl + // removal of empty partition is too expensive (we need to scan all data keys for this partitions with ALLOW FILTERING) + long ttl = computeTtl(0); long partition = toPartitionTs(tsKvEntryTs); if (cassandraTsPartitionsCache == null) { return doSavePartition(tenantId, entityId, key, ttl, partition); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index e9af5f0b75..5700410fbb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -33,7 +33,7 @@ public interface TimeseriesDao { ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl); - ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl); + ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key); ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java index d3c6c97367..e8eae60131 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java @@ -100,10 +100,10 @@ public class CassandraPartitionsCacheTest { long tsKvEntryTs = System.currentTimeMillis(); for (int i = 0; i < 50000; i++) { - cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0); + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i); } for (int i = 0; i < 60000; i++) { - cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0); + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i); } verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class)); }