improvements after review
This commit is contained in:
parent
0de5868bc5
commit
aede1af6f9
@ -405,33 +405,29 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
} else {
|
} else {
|
||||||
CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition);
|
CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition);
|
||||||
CompletableFuture<Boolean> hasFuture = cassandraTsPartitionsCache.has(partitionSearchKey);
|
CompletableFuture<Boolean> hasFuture = cassandraTsPartitionsCache.has(partitionSearchKey);
|
||||||
SettableFuture<Boolean> listenableFuture = SettableFuture.create();
|
SettableFuture<Void> listenableFuture = SettableFuture.create();
|
||||||
if (hasFuture == null) {
|
if (hasFuture == null) {
|
||||||
return processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, ttl);
|
return processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, ttl);
|
||||||
} else {
|
} else {
|
||||||
|
long finalTtl = ttl;
|
||||||
hasFuture.whenComplete((result, throwable) -> {
|
hasFuture.whenComplete((result, throwable) -> {
|
||||||
if (throwable != null) {
|
if (throwable != null) {
|
||||||
listenableFuture.setException(throwable);
|
listenableFuture.setException(throwable);
|
||||||
|
} else if (result) {
|
||||||
|
listenableFuture.set(null);
|
||||||
} else {
|
} else {
|
||||||
listenableFuture.set(result);
|
listenableFuture.setFuture(processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, finalTtl));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
long finalTtl = ttl;
|
return listenableFuture;
|
||||||
return Futures.transformAsync(listenableFuture, result -> {
|
|
||||||
if (result) {
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
} else {
|
|
||||||
return processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, finalTtl);
|
|
||||||
}
|
|
||||||
}, readResultsProcessingExecutor);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> processDoSavePartition(TenantId tenantId, EntityId entityId, String key, long partition, CassandraPartitionCacheKey partitionSearchKey, long ttl) {
|
private ListenableFuture<Void> processDoSavePartition(TenantId tenantId, EntityId entityId, String key, long partition, CassandraPartitionCacheKey partitionSearchKey, long ttl) {
|
||||||
return Futures.transformAsync(doSavePartition(tenantId, entityId, key, ttl, partition), input -> {
|
return Futures.transform(doSavePartition(tenantId, entityId, key, ttl, partition), input -> {
|
||||||
cassandraTsPartitionsCache.put(partitionSearchKey);
|
cassandraTsPartitionsCache.put(partitionSearchKey);
|
||||||
return Futures.immediateFuture(input);
|
return input;
|
||||||
}, readResultsProcessingExecutor);
|
}, readResultsProcessingExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -103,15 +103,12 @@ public class CassandraPartitionsCacheTest {
|
|||||||
ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "cluster", cluster);
|
ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "cluster", cluster);
|
||||||
|
|
||||||
doReturn(Futures.immediateFuture(null)).when(cassandraBaseTimeseriesDao).getFuture(any(ResultSetFuture.class), any());
|
doReturn(Futures.immediateFuture(null)).when(cassandraBaseTimeseriesDao).getFuture(any(ResultSetFuture.class), any());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPartitionSave() throws Exception {
|
public void testPartitionSave() throws Exception {
|
||||||
|
|
||||||
cassandraBaseTimeseriesDao.init();
|
cassandraBaseTimeseriesDao.init();
|
||||||
|
|
||||||
|
|
||||||
UUID id = UUID.randomUUID();
|
UUID id = UUID.randomUUID();
|
||||||
TenantId tenantId = new TenantId(id);
|
TenantId tenantId = new TenantId(id);
|
||||||
long tsKvEntryTs = System.currentTimeMillis();
|
long tsKvEntryTs = System.currentTimeMillis();
|
||||||
@ -119,14 +116,10 @@ public class CassandraPartitionsCacheTest {
|
|||||||
for (int i = 0; i < 50000; i++) {
|
for (int i = 0; i < 50000; i++) {
|
||||||
cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
|
cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < 60000; i++) {
|
for (int i = 0; i < 60000; i++) {
|
||||||
cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
|
cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
|
verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user