From 2ea3b18738e95e646b6220fb898781d53d9f2e8b Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 21 Dec 2020 17:05:12 +0200 Subject: [PATCH] partitions cache improvements --- .../CassandraBaseTimeseriesDao.java | 41 ++++++++++--------- .../CassandraTsPartitionsCache.java | 4 +- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 46f8b4a5cb..7bcae3b740 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -404,31 +404,32 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return doSavePartition(tenantId, entityId, key, ttl, partition); } else { CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition); - CompletableFuture hasInCacheFuture = cassandraTsPartitionsCache.has(partitionSearchKey); - if (hasInCacheFuture == null) { - return doSavePartitionWithCache(tenantId, entityId, key, partition, partitionSearchKey, ttl); + if (!cassandraTsPartitionsCache.has(partitionSearchKey)) { + ListenableFuture result = doSavePartition(tenantId, entityId, key, ttl, partition); + Futures.addCallback(result, new CacheCallback<>(partitionSearchKey), MoreExecutors.directExecutor()); + return result; } else { - long finalTtl = ttl; - SettableFuture futureResult = SettableFuture.create(); - hasInCacheFuture.whenComplete((result, throwable) -> { - if (throwable != null) { - futureResult.setException(throwable); - } else if (result) { - futureResult.set(null); - } else { - futureResult.setFuture(doSavePartitionWithCache(tenantId, entityId, key, partition, partitionSearchKey, finalTtl)); - } - }); - return futureResult; + return Futures.immediateFuture(null); } } } - private ListenableFuture doSavePartitionWithCache(TenantId tenantId, EntityId entityId, String key, long partition, CassandraPartitionCacheKey partitionSearchKey, long ttl) { - return Futures.transform(doSavePartition(tenantId, entityId, key, ttl, partition), input -> { - cassandraTsPartitionsCache.put(partitionSearchKey); - return input; - }, readResultsProcessingExecutor); + private class CacheCallback implements FutureCallback { + private final CassandraPartitionCacheKey key; + + private CacheCallback(CassandraPartitionCacheKey key) { + this.key = key; + } + + @Override + public void onSuccess(Void result) { + cassandraTsPartitionsCache.put(key); + } + + @Override + public void onFailure(Throwable t) { + + } } private ListenableFuture doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraTsPartitionsCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraTsPartitionsCache.java index b467b5446f..bafc00c872 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraTsPartitionsCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraTsPartitionsCache.java @@ -32,8 +32,8 @@ public class CassandraTsPartitionsCache { }); } - public CompletableFuture has(CassandraPartitionCacheKey key) { - return partitionsCache.getIfPresent(key); + public boolean has(CassandraPartitionCacheKey key) { + return partitionsCache.getIfPresent(key) != null; } public void put(CassandraPartitionCacheKey key) {