diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index d39c6c5afb..7d834a8eea 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -405,33 +405,29 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } else { CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition); CompletableFuture hasFuture = cassandraTsPartitionsCache.has(partitionSearchKey); - SettableFuture listenableFuture = SettableFuture.create(); + SettableFuture listenableFuture = SettableFuture.create(); if (hasFuture == null) { return processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, ttl); } else { + long finalTtl = ttl; hasFuture.whenComplete((result, throwable) -> { if (throwable != null) { listenableFuture.setException(throwable); + } else if (result) { + listenableFuture.set(null); } else { - listenableFuture.set(result); + listenableFuture.setFuture(processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, finalTtl)); } }); - long finalTtl = ttl; - return Futures.transformAsync(listenableFuture, result -> { - if (result) { - return Futures.immediateFuture(null); - } else { - return processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, finalTtl); - } - }, readResultsProcessingExecutor); + return listenableFuture; } } } private ListenableFuture processDoSavePartition(TenantId tenantId, EntityId entityId, String key, long partition, CassandraPartitionCacheKey partitionSearchKey, long ttl) { - return Futures.transformAsync(doSavePartition(tenantId, entityId, key, ttl, partition), input -> { + return Futures.transform(doSavePartition(tenantId, entityId, key, ttl, partition), input -> { cassandraTsPartitionsCache.put(partitionSearchKey); - return Futures.immediateFuture(input); + return input; }, readResultsProcessingExecutor); } diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java index 82c0e0d6d2..a67d84964a 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/CassandraPartitionsCacheTest.java @@ -103,15 +103,12 @@ public class CassandraPartitionsCacheTest { ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "cluster", cluster); doReturn(Futures.immediateFuture(null)).when(cassandraBaseTimeseriesDao).getFuture(any(ResultSetFuture.class), any()); - } @Test public void testPartitionSave() throws Exception { - cassandraBaseTimeseriesDao.init(); - UUID id = UUID.randomUUID(); TenantId tenantId = new TenantId(id); long tsKvEntryTs = System.currentTimeMillis(); @@ -119,14 +116,10 @@ public class CassandraPartitionsCacheTest { for (int i = 0; i < 50000; i++) { cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0); } - for (int i = 0; i < 60000; i++) { cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0); } verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class)); - - } - } \ No newline at end of file