diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java index 2994810d7c..9463b08198 100644 --- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java +++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java @@ -18,11 +18,13 @@ package org.thingsboard.server; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.ComponentScan; +import org.springframework.scheduling.annotation.EnableAsync; import springfox.documentation.swagger2.annotations.EnableSwagger2; import java.util.Arrays; @SpringBootConfiguration +@EnableAsync @EnableSwagger2 @ComponentScan({"org.thingsboard.server"}) public class ThingsboardServerApplication { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index 2b77d085a7..3219d58ac9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -19,6 +19,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; @@ -39,6 +40,8 @@ import org.thingsboard.server.dao.util.SqlDao; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; @@ -80,7 +83,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp return findAllAsyncWithLimit(entityId, query); } else { long stepTs = query.getStartTs(); - List> futures = new ArrayList<>(); + List>> futures = new ArrayList<>(); while (stepTs < query.getEndTs()) { long startTs = stepTs; long endTs = stepTs + query.getInterval(); @@ -88,16 +91,30 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation())); stepTs = endTs; } - return Futures.allAsList(futures); + ListenableFuture>> future = Futures.allAsList(futures); + return Futures.transform(future, new Function>, List>() { + @Nullable + @Override + public List apply(@Nullable List> results) { + if (results == null || results.isEmpty()) { + return null; + } + return results.stream() + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + }, service); } } - private ListenableFuture findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { - TsKvEntity entity; + private ListenableFuture> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { + CompletableFuture entity; + String entityIdStr = fromTimeUUID(entityId.getId()); switch (aggregation) { case AVG: entity = tsKvRepository.findAvg( - fromTimeUUID(entityId.getId()), + entityIdStr, entityId.getEntityType(), key, startTs, @@ -106,7 +123,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp break; case MAX: entity = tsKvRepository.findMax( - fromTimeUUID(entityId.getId()), + entityIdStr, entityId.getEntityType(), key, startTs, @@ -115,7 +132,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp break; case MIN: entity = tsKvRepository.findMin( - fromTimeUUID(entityId.getId()), + entityIdStr, entityId.getEntityType(), key, startTs, @@ -124,7 +141,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp break; case SUM: entity = tsKvRepository.findSum( - fromTimeUUID(entityId.getId()), + entityIdStr, entityId.getEntityType(), key, startTs, @@ -133,7 +150,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp break; case COUNT: entity = tsKvRepository.findCount( - fromTimeUUID(entityId.getId()), + entityIdStr, entityId.getEntityType(), key, startTs, @@ -141,12 +158,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp break; default: - entity = null; + throw new IllegalArgumentException("Not supported aggregation type: " + aggregation); } - if (entity != null) { - entity.setTs(ts); - } - return service.submit(() -> DaoUtil.getData(entity)); + + SettableFuture listenableFuture = SettableFuture.create(); + entity.whenComplete((tsKvEntity, throwable) -> { + if (throwable != null) { + listenableFuture.setException(throwable); + } else { + listenableFuture.set(tsKvEntity); + } + }); + return Futures.transform(listenableFuture, new Function>() { + @Nullable + @Override + public Optional apply(@Nullable TsKvEntity entity) { + if (entity != null && entity.isNotEmpty()) { + entity.setEntityId(entityIdStr); + entity.setEntityType(entityId.getEntityType()); + entity.setKey(key); + entity.setTs(ts); + return Optional.of(DaoUtil.getData(entity)); + } else { + return Optional.empty(); + } + } + }); } private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java index b37d27218b..d1e206ecf8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java @@ -26,7 +26,7 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity; import org.thingsboard.server.dao.util.SqlDao; import java.util.List; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; @SqlDao public interface TsKvRepository extends CrudRepository { @@ -45,17 +45,17 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") - Future findMax(@Param("entityId") String entityId, - @Param("entityType") EntityType entityType, - @Param("entityKey") String entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); + CompletableFuture findMax(@Param("entityId") String entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); @Async @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") - Future findMin(@Param("entityId") String entityId, + CompletableFuture findMin(@Param("entityId") String entityId, @Param("entityType") EntityType entityType, @Param("entityKey") String entityKey, @Param("startTs") long startTs, @@ -65,7 +65,7 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") - Future findCount(@Param("entityId") String entityId, + CompletableFuture findCount(@Param("entityId") String entityId, @Param("entityType") EntityType entityType, @Param("entityKey") String entityKey, @Param("startTs") long startTs, @@ -75,7 +75,7 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") - Future findAvg(@Param("entityId") String entityId, + CompletableFuture findAvg(@Param("entityId") String entityId, @Param("entityType") EntityType entityType, @Param("entityKey") String entityKey, @Param("startTs") long startTs, @@ -86,7 +86,7 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") - Future findSum(@Param("entityId") String entityId, + CompletableFuture findSum(@Param("entityId") String entityId, @Param("entityType") EntityType entityType, @Param("entityKey") String entityKey, @Param("startTs") long startTs,