diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 112daf0637..f1a58aa417 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -151,12 +151,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme @Override public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { - return Futures.immediateFuture(Optional.ofNullable(doFindLatest(entityId, key))); + return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key))); } @Override public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return Futures.immediateFuture(getLatestTsKvEntry(entityId, key)); + return service.submit(() -> getLatestTsKvEntry(entityId, key)); } @Override @@ -221,36 +221,29 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - TsKvEntry latest = doFindLatest(entityId, query.getKey()); - - if (latest == null) { - return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); - } - - long ts = latest.getTs(); - ListenableFuture removedLatestFuture; - if (ts >= query.getStartTs() && ts < query.getEndTs()) { - TsKvLatestEntity latestEntity = new TsKvLatestEntity(); - latestEntity.setEntityId(entityId.getId()); - latestEntity.setKey(getOrSaveKeyId(query.getKey())); - removedLatestFuture = service.submit(() -> { + ListenableFuture latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey())); + return Futures.transformAsync(latestFuture, latest -> { + if (latest == null) { + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); + } + boolean isRemoved = false; + long ts = latest.getTs(); + if (ts >= query.getStartTs() && ts < query.getEndTs()) { + TsKvLatestEntity latestEntity = new TsKvLatestEntity(); + latestEntity.setEntityId(entityId.getId()); + latestEntity.setKey(getOrSaveKeyId(query.getKey())); tsKvLatestRepository.delete(latestEntity); - return true; - }); - } else { - removedLatestFuture = Futures.immediateFuture(false); - } - - return Futures.transformAsync(removedLatestFuture, isRemoved -> { - if (isRemoved && query.getRewriteLatestIfDeleted()) { - return getNewLatestEntryFuture(tenantId, entityId, query); + isRemoved = true; + if (query.getRewriteLatestIfDeleted()) { + return getNewLatestEntryFuture(tenantId, entityId, query); + } } return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); }, MoreExecutors.directExecutor()); } protected ListenableFuture> getFindAllLatestFuture(EntityId entityId) { - return Futures.immediateFuture( + return service.submit(() -> DaoUtil.convertDataList(Lists.newArrayList( searchTsKvLatestRepository.findAllByEntityId(entityId.getId())))); }