SqlTimeseriesLatestDao async refactoring: replaced Futures.immediateFuture() blocking call with true async using service.submit() and Futures.transformAsync()

This commit is contained in:
Sergey Matvienko 2024-03-08 18:20:13 +01:00
parent 59d0f36697
commit c4b243ef10

View File

@ -151,12 +151,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
@Override @Override
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
return Futures.immediateFuture(Optional.ofNullable(doFindLatest(entityId, key))); return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key)));
} }
@Override @Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) { public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return Futures.immediateFuture(getLatestTsKvEntry(entityId, key)); return service.submit(() -> getLatestTsKvEntry(entityId, key));
} }
@Override @Override
@ -221,36 +221,29 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
} }
protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
TsKvEntry latest = doFindLatest(entityId, query.getKey()); ListenableFuture<TsKvEntry> latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey()));
return Futures.transformAsync(latestFuture, latest -> {
if (latest == null) { if (latest == null) {
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false));
} }
boolean isRemoved = false;
long ts = latest.getTs(); long ts = latest.getTs();
ListenableFuture<Boolean> removedLatestFuture;
if (ts >= query.getStartTs() && ts < query.getEndTs()) { if (ts >= query.getStartTs() && ts < query.getEndTs()) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity(); TsKvLatestEntity latestEntity = new TsKvLatestEntity();
latestEntity.setEntityId(entityId.getId()); latestEntity.setEntityId(entityId.getId());
latestEntity.setKey(getOrSaveKeyId(query.getKey())); latestEntity.setKey(getOrSaveKeyId(query.getKey()));
removedLatestFuture = service.submit(() -> {
tsKvLatestRepository.delete(latestEntity); tsKvLatestRepository.delete(latestEntity);
return true; isRemoved = true;
}); if (query.getRewriteLatestIfDeleted()) {
} else {
removedLatestFuture = Futures.immediateFuture(false);
}
return Futures.transformAsync(removedLatestFuture, isRemoved -> {
if (isRemoved && query.getRewriteLatestIfDeleted()) {
return getNewLatestEntryFuture(tenantId, entityId, query); return getNewLatestEntryFuture(tenantId, entityId, query);
} }
}
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved));
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) { protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {
return Futures.immediateFuture( return service.submit(() ->
DaoUtil.convertDataList(Lists.newArrayList( DaoUtil.convertDataList(Lists.newArrayList(
searchTsKvLatestRepository.findAllByEntityId(entityId.getId())))); searchTsKvLatestRepository.findAllByEntityId(entityId.getId()))));
} }