partitions cache improvements
This commit is contained in:
parent
0a2255d055
commit
2ea3b18738
@ -404,31 +404,32 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
return doSavePartition(tenantId, entityId, key, ttl, partition);
|
return doSavePartition(tenantId, entityId, key, ttl, partition);
|
||||||
} else {
|
} else {
|
||||||
CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition);
|
CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition);
|
||||||
CompletableFuture<Boolean> hasInCacheFuture = cassandraTsPartitionsCache.has(partitionSearchKey);
|
if (!cassandraTsPartitionsCache.has(partitionSearchKey)) {
|
||||||
if (hasInCacheFuture == null) {
|
ListenableFuture<Void> result = doSavePartition(tenantId, entityId, key, ttl, partition);
|
||||||
return doSavePartitionWithCache(tenantId, entityId, key, partition, partitionSearchKey, ttl);
|
Futures.addCallback(result, new CacheCallback<>(partitionSearchKey), MoreExecutors.directExecutor());
|
||||||
|
return result;
|
||||||
} else {
|
} else {
|
||||||
long finalTtl = ttl;
|
return Futures.immediateFuture(null);
|
||||||
SettableFuture<Void> futureResult = SettableFuture.create();
|
|
||||||
hasInCacheFuture.whenComplete((result, throwable) -> {
|
|
||||||
if (throwable != null) {
|
|
||||||
futureResult.setException(throwable);
|
|
||||||
} else if (result) {
|
|
||||||
futureResult.set(null);
|
|
||||||
} else {
|
|
||||||
futureResult.setFuture(doSavePartitionWithCache(tenantId, entityId, key, partition, partitionSearchKey, finalTtl));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return futureResult;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> doSavePartitionWithCache(TenantId tenantId, EntityId entityId, String key, long partition, CassandraPartitionCacheKey partitionSearchKey, long ttl) {
|
private class CacheCallback<Void> implements FutureCallback<Void> {
|
||||||
return Futures.transform(doSavePartition(tenantId, entityId, key, ttl, partition), input -> {
|
private final CassandraPartitionCacheKey key;
|
||||||
cassandraTsPartitionsCache.put(partitionSearchKey);
|
|
||||||
return input;
|
private CacheCallback(CassandraPartitionCacheKey key) {
|
||||||
}, readResultsProcessingExecutor);
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Void result) {
|
||||||
|
cassandraTsPartitionsCache.put(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) {
|
private ListenableFuture<Void> doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) {
|
||||||
|
|||||||
@ -32,8 +32,8 @@ public class CassandraTsPartitionsCache {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Boolean> has(CassandraPartitionCacheKey key) {
|
public boolean has(CassandraPartitionCacheKey key) {
|
||||||
return partitionsCache.getIfPresent(key);
|
return partitionsCache.getIfPresent(key) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void put(CassandraPartitionCacheKey key) {
|
public void put(CassandraPartitionCacheKey key) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user