findRelationsRecursively refactoring - removed blocking operations
This commit is contained in:
		
							parent
							
								
									323a47a744
								
							
						
					
					
						commit
						e1534721c5
					
				@ -52,6 +52,7 @@ import java.util.List;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.validateId;
 | 
			
		||||
 | 
			
		||||
@ -512,40 +513,42 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Set<EntityRelation>> findRelationsRecursively(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction,
 | 
			
		||||
                                                                           RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly,
 | 
			
		||||
                                                                           final ConcurrentHashMap<EntityId, Boolean> uniqueMap) throws Exception {
 | 
			
		||||
                                                                           final ConcurrentHashMap<EntityId, Boolean> uniqueMap) {
 | 
			
		||||
        if (lvl == 0) {
 | 
			
		||||
            return Futures.immediateFuture(Collections.emptySet());
 | 
			
		||||
        }
 | 
			
		||||
        lvl--;
 | 
			
		||||
        //TODO: try to remove this blocking operation
 | 
			
		||||
        Set<EntityRelation> children = new HashSet<>(findRelations(tenantId, rootId, direction, relationTypeGroup).get());
 | 
			
		||||
        Set<EntityId> childrenIds = new HashSet<>();
 | 
			
		||||
        for (EntityRelation childRelation : children) {
 | 
			
		||||
            log.trace("Found Relation: {}", childRelation);
 | 
			
		||||
            EntityId childId;
 | 
			
		||||
            if (direction == EntitySearchDirection.FROM) {
 | 
			
		||||
                childId = childRelation.getTo();
 | 
			
		||||
            } else {
 | 
			
		||||
                childId = childRelation.getFrom();
 | 
			
		||||
            }
 | 
			
		||||
            if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) {
 | 
			
		||||
                log.trace("Adding Relation: {}", childId);
 | 
			
		||||
                if (childrenIds.add(childId)) {
 | 
			
		||||
                    log.trace("Added Relation: {}", childId);
 | 
			
		||||
        final int currentLvl = --lvl;
 | 
			
		||||
        final Set<EntityRelation> children = new HashSet<>();
 | 
			
		||||
        ListenableFuture<List<EntityRelation>> rootRelationsFuture = findRelations(tenantId, rootId, direction, relationTypeGroup);
 | 
			
		||||
        ListenableFuture<Set<EntityId>> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> {
 | 
			
		||||
            children.addAll(relations);
 | 
			
		||||
            Set<EntityId> childrenIds = new HashSet<>();
 | 
			
		||||
            for (EntityRelation childRelation : children) {
 | 
			
		||||
                log.trace("Found Relation: {}", childRelation);
 | 
			
		||||
                EntityId childId = direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom();
 | 
			
		||||
                if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) {
 | 
			
		||||
                    log.trace("Adding Relation: {}", childId);
 | 
			
		||||
                    if (childrenIds.add(childId)) {
 | 
			
		||||
                        log.trace("Added Relation: {}", childId);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        List<ListenableFuture<Set<EntityRelation>>> futures = new ArrayList<>();
 | 
			
		||||
        for (EntityId entityId : childrenIds) {
 | 
			
		||||
            futures.add(findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, lvl, fetchLastLevelOnly, uniqueMap));
 | 
			
		||||
        }
 | 
			
		||||
        //TODO: try to remove this blocking operation
 | 
			
		||||
        List<Set<EntityRelation>> relations = Futures.successfulAsList(futures).get();
 | 
			
		||||
        if (fetchLastLevelOnly && lvl > 0) {
 | 
			
		||||
            children.clear();
 | 
			
		||||
        }
 | 
			
		||||
        relations.forEach(r -> r.forEach(children::add));
 | 
			
		||||
        return Futures.immediateFuture(children);
 | 
			
		||||
            return childrenIds;
 | 
			
		||||
        }, MoreExecutors.directExecutor());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<List<Set<EntityRelation>>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds ->
 | 
			
		||||
                Futures.successfulAsList(childrenIds.stream()
 | 
			
		||||
                        .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap))
 | 
			
		||||
                        .collect(Collectors.toList())), MoreExecutors.directExecutor());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Set<EntityRelation>> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> {
 | 
			
		||||
            if (fetchLastLevelOnly && currentLvl > 0) {
 | 
			
		||||
                children.clear();
 | 
			
		||||
            }
 | 
			
		||||
            recursiveRelations.forEach(children::addAll);
 | 
			
		||||
            return children;
 | 
			
		||||
        }, MoreExecutors.directExecutor());
 | 
			
		||||
        return relationsFuture;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<List<EntityRelation>> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user