diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 6f6a942162..c751f7334b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -512,40 +513,42 @@ public class BaseRelationService implements RelationService { private ListenableFuture> findRelationsRecursively(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, - final ConcurrentHashMap uniqueMap) throws Exception { + final ConcurrentHashMap uniqueMap) { if (lvl == 0) { return Futures.immediateFuture(Collections.emptySet()); } - lvl--; - //TODO: try to remove this blocking operation - Set children = new HashSet<>(findRelations(tenantId, rootId, direction, relationTypeGroup).get()); - Set childrenIds = new HashSet<>(); - for (EntityRelation childRelation : children) { - log.trace("Found Relation: {}", childRelation); - EntityId childId; - if (direction == EntitySearchDirection.FROM) { - childId = childRelation.getTo(); - } else { - childId = childRelation.getFrom(); - } - if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { - log.trace("Adding Relation: {}", childId); - if (childrenIds.add(childId)) { - log.trace("Added Relation: {}", childId); + final int currentLvl = --lvl; + final Set children = new HashSet<>(); + ListenableFuture> rootRelationsFuture = findRelations(tenantId, rootId, direction, relationTypeGroup); + ListenableFuture> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> { + children.addAll(relations); + Set childrenIds = new HashSet<>(); + for (EntityRelation childRelation : children) { + log.trace("Found Relation: {}", childRelation); + EntityId childId = direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom(); + if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { + log.trace("Adding Relation: {}", childId); + if (childrenIds.add(childId)) { + log.trace("Added Relation: {}", childId); + } } } - } - List>> futures = new ArrayList<>(); - for (EntityId entityId : childrenIds) { - futures.add(findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, lvl, fetchLastLevelOnly, uniqueMap)); - } - //TODO: try to remove this blocking operation - List> relations = Futures.successfulAsList(futures).get(); - if (fetchLastLevelOnly && lvl > 0) { - children.clear(); - } - relations.forEach(r -> r.forEach(children::add)); - return Futures.immediateFuture(children); + return childrenIds; + }, MoreExecutors.directExecutor()); + + ListenableFuture>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds -> + Futures.successfulAsList(childrenIds.stream() + .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap)) + .collect(Collectors.toList())), MoreExecutors.directExecutor()); + + ListenableFuture> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> { + if (fetchLastLevelOnly && currentLvl > 0) { + children.clear(); + } + recursiveRelations.forEach(children::addAll); + return children; + }, MoreExecutors.directExecutor()); + return relationsFuture; } private ListenableFuture> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {