Experiments with queue vs futures
This commit is contained in:
		
							parent
							
								
									32f01438b1
								
							
						
					
					
						commit
						582c65b71e
					
				@ -79,6 +79,8 @@ public interface RelationService {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<EntityRelation>> findByQuery(TenantId tenantId, EntityRelationsQuery query);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<EntityRelation>> findByQuery2(TenantId tenantId, EntityRelationsQuery query);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<EntityRelationInfo>> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query);
 | 
			
		||||
 | 
			
		||||
    void removeRelations(TenantId tenantId, EntityId entityId);
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,7 @@
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
@ -20,6 +20,9 @@ import com.google.common.collect.Lists;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import com.google.common.util.concurrent.SettableFuture;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.context.ApplicationEventPublisher;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
@ -49,8 +52,11 @@ import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Queue;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedQueue;
 | 
			
		||||
import java.util.concurrent.LinkedBlockingQueue;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@ -407,6 +413,39 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<EntityRelation>> findByQuery2(TenantId tenantId, EntityRelationsQuery query) {
 | 
			
		||||
        //boolean fetchLastLevelOnly = true;
 | 
			
		||||
        log.trace("Executing findByQuery [{}]", query);
 | 
			
		||||
        RelationsSearchParameters params = query.getParameters();
 | 
			
		||||
        final List<RelationEntityTypeFilter> filters = query.getFilters();
 | 
			
		||||
        if (filters == null || filters.isEmpty()) {
 | 
			
		||||
            log.debug("Filters are not set [{}]", query);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE;
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            ListenableFuture<Set<EntityRelation>> relationSet = findRelationsRecursively2(tenantId, params.getEntityId(), params.getDirection(), params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>());
 | 
			
		||||
            return Futures.transform(relationSet, input -> {
 | 
			
		||||
                List<EntityRelation> relations = new ArrayList<>();
 | 
			
		||||
                if (filters == null || filters.isEmpty()) {
 | 
			
		||||
                    relations.addAll(input);
 | 
			
		||||
                    return relations;
 | 
			
		||||
                }
 | 
			
		||||
                for (EntityRelation relation : input) {
 | 
			
		||||
                    if (matchFilters(filters, relation, params.getDirection())) {
 | 
			
		||||
                        relations.add(relation);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                return relations;
 | 
			
		||||
            }, MoreExecutors.directExecutor());
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.warn("Failed to query relations: [{}]", query, e);
 | 
			
		||||
            throw new RuntimeException(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<EntityRelationInfo>> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query) {
 | 
			
		||||
        log.trace("Executing findInfoByQuery [{}]", query);
 | 
			
		||||
@ -511,15 +550,75 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @RequiredArgsConstructor
 | 
			
		||||
    private static class RelationQueueCtx {
 | 
			
		||||
        final SettableFuture<Set<EntityRelation>> future = SettableFuture.create();
 | 
			
		||||
        final Set<EntityRelation> result = ConcurrentHashMap.newKeySet();
 | 
			
		||||
        final Queue<RelationTask> tasks = new ConcurrentLinkedQueue<>();
 | 
			
		||||
 | 
			
		||||
        final TenantId tenantId;
 | 
			
		||||
        final EntitySearchDirection direction;
 | 
			
		||||
        final RelationTypeGroup relationTypeGroup;
 | 
			
		||||
        final boolean fetchLastLevelOnly;
 | 
			
		||||
        final int lvl;
 | 
			
		||||
        final ConcurrentHashMap<EntityId, Boolean> uniqueMap;
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @RequiredArgsConstructor
 | 
			
		||||
    private static class RelationTask {
 | 
			
		||||
        private final int currentLvl;
 | 
			
		||||
        private final EntityId root;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processQueue(RelationQueueCtx ctx) {
 | 
			
		||||
        RelationTask task = ctx.tasks.poll();
 | 
			
		||||
        while (task != null) {
 | 
			
		||||
            List<EntityRelation> relations = findRelations(ctx.tenantId, task.root, ctx.direction, ctx.relationTypeGroup);
 | 
			
		||||
            Set<EntityId> childrenIds = new HashSet<>();
 | 
			
		||||
            for (EntityRelation childRelation : relations) {
 | 
			
		||||
                log.trace("Found Relation: {}", childRelation);
 | 
			
		||||
                EntityId childId = ctx.direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom();
 | 
			
		||||
                if (ctx.uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) {
 | 
			
		||||
                    log.trace("Adding Relation: {}", childId);
 | 
			
		||||
                    if (childrenIds.add(childId)) {
 | 
			
		||||
                        log.trace("Added Relation: {}", childId);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            for (EntityId child : childrenIds) {
 | 
			
		||||
                ctx.tasks.add(new RelationTask(task.currentLvl - 1, child));
 | 
			
		||||
                if (!ctx.fetchLastLevelOnly || task.currentLvl == 0) {
 | 
			
		||||
                    ctx.result.addAll(relations);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            task = ctx.tasks.poll();
 | 
			
		||||
        }
 | 
			
		||||
        ctx.future.set(ctx.result);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Set<EntityRelation>> findRelationsRecursively(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction,
 | 
			
		||||
                                                                           RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly,
 | 
			
		||||
                                                                           final ConcurrentHashMap<EntityId, Boolean> uniqueMap) {
 | 
			
		||||
        if (lvl == 0) {
 | 
			
		||||
            return Futures.immediateFuture(Collections.emptySet());
 | 
			
		||||
        }
 | 
			
		||||
        var relationQueueCtx = new RelationQueueCtx(tenantId, direction, relationTypeGroup, fetchLastLevelOnly, lvl, uniqueMap);
 | 
			
		||||
        relationQueueCtx.tasks.add(new RelationTask(lvl, rootId));
 | 
			
		||||
        executor.submit(() -> processQueue(relationQueueCtx));
 | 
			
		||||
        return relationQueueCtx.future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Set<EntityRelation>> findRelationsRecursively2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction,
 | 
			
		||||
                                                                           RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly,
 | 
			
		||||
                                                                           final ConcurrentHashMap<EntityId, Boolean> uniqueMap) {
 | 
			
		||||
        if (lvl == 0) {
 | 
			
		||||
            return Futures.immediateFuture(Collections.emptySet());
 | 
			
		||||
        }
 | 
			
		||||
        final int currentLvl = --lvl;
 | 
			
		||||
        final Set<EntityRelation> children = new HashSet<>();
 | 
			
		||||
        ListenableFuture<List<EntityRelation>> rootRelationsFuture = findRelations(tenantId, rootId, direction, relationTypeGroup);
 | 
			
		||||
        ListenableFuture<List<EntityRelation>> rootRelationsFuture = findRelations2(tenantId, rootId, direction, relationTypeGroup);
 | 
			
		||||
        ListenableFuture<Set<EntityId>> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> {
 | 
			
		||||
            children.addAll(relations);
 | 
			
		||||
            Set<EntityId> childrenIds = new HashSet<>();
 | 
			
		||||
@ -539,7 +638,7 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        ListenableFuture<List<Set<EntityRelation>>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds ->
 | 
			
		||||
                Futures.successfulAsList(childrenIds.stream()
 | 
			
		||||
                        .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap))
 | 
			
		||||
                        .collect(Collectors.toList())), MoreExecutors.directExecutor());
 | 
			
		||||
                        .collect(Collectors.toList())), executor);
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Set<EntityRelation>> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> {
 | 
			
		||||
            if (fetchLastLevelOnly && currentLvl > 0) {
 | 
			
		||||
@ -547,11 +646,24 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
            }
 | 
			
		||||
            recursiveRelations.forEach(children::addAll);
 | 
			
		||||
            return children;
 | 
			
		||||
        }, MoreExecutors.directExecutor());
 | 
			
		||||
        }, executor);
 | 
			
		||||
        return relationsFuture;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<List<EntityRelation>> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {
 | 
			
		||||
    private List<EntityRelation> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {
 | 
			
		||||
        List<EntityRelation> relations;
 | 
			
		||||
        if (relationTypeGroup == null) {
 | 
			
		||||
            relationTypeGroup = RelationTypeGroup.COMMON;
 | 
			
		||||
        }
 | 
			
		||||
        if (direction == EntitySearchDirection.FROM) {
 | 
			
		||||
            relations = findByFrom(tenantId, rootId, relationTypeGroup);
 | 
			
		||||
        } else {
 | 
			
		||||
            relations = findByTo(tenantId, rootId, relationTypeGroup);
 | 
			
		||||
        }
 | 
			
		||||
        return relations;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<List<EntityRelation>> findRelations2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {
 | 
			
		||||
        ListenableFuture<List<EntityRelation>> relations;
 | 
			
		||||
        if (relationTypeGroup == null) {
 | 
			
		||||
            relationTypeGroup = RelationTypeGroup.COMMON;
 | 
			
		||||
 | 
			
		||||
@ -289,8 +289,8 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testRecursiveRelationDepth() throws ExecutionException, InterruptedException {
 | 
			
		||||
        int maxLevel = 100;
 | 
			
		||||
    public void testRecursiveRelationDepth3() throws ExecutionException, InterruptedException {
 | 
			
		||||
        int maxLevel = 1000;
 | 
			
		||||
        AssetId root = new AssetId(Uuids.timeBased());
 | 
			
		||||
        AssetId left = new AssetId(Uuids.timeBased());
 | 
			
		||||
        AssetId right = new AssetId(Uuids.timeBased());
 | 
			
		||||
@ -336,6 +336,54 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testRecursiveRelationDepth2() throws ExecutionException, InterruptedException {
 | 
			
		||||
        int maxLevel = 1000;
 | 
			
		||||
        AssetId root = new AssetId(Uuids.timeBased());
 | 
			
		||||
        AssetId left = new AssetId(Uuids.timeBased());
 | 
			
		||||
        AssetId right = new AssetId(Uuids.timeBased());
 | 
			
		||||
 | 
			
		||||
        List<EntityRelation> expected = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
        EntityRelation relationAB = new EntityRelation(root, left, EntityRelation.CONTAINS_TYPE);
 | 
			
		||||
        EntityRelation relationBC = new EntityRelation(root, right, EntityRelation.CONTAINS_TYPE);
 | 
			
		||||
        saveRelation(relationAB);
 | 
			
		||||
        expected.add(relationAB);
 | 
			
		||||
 | 
			
		||||
        saveRelation(relationBC);
 | 
			
		||||
        expected.add(relationBC);
 | 
			
		||||
 | 
			
		||||
        for (int i = 0; i < maxLevel; i++) {
 | 
			
		||||
            var newLeft = new AssetId(Uuids.timeBased());
 | 
			
		||||
            var newRight = new AssetId(Uuids.timeBased());
 | 
			
		||||
            EntityRelation relationLeft = new EntityRelation(left, newLeft, EntityRelation.CONTAINS_TYPE);
 | 
			
		||||
            EntityRelation relationRight = new EntityRelation(right, newRight, EntityRelation.CONTAINS_TYPE);
 | 
			
		||||
            saveRelation(relationLeft);
 | 
			
		||||
            expected.add(relationLeft);
 | 
			
		||||
            saveRelation(relationRight);
 | 
			
		||||
            expected.add(relationRight);
 | 
			
		||||
            left = newLeft;
 | 
			
		||||
            right = newRight;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
			
		||||
        query.setParameters(new RelationsSearchParameters(root, EntitySearchDirection.FROM, -1, false));
 | 
			
		||||
        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
			
		||||
        List<EntityRelation> relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get();
 | 
			
		||||
        Assert.assertEquals(expected.size(), relations.size());
 | 
			
		||||
        for(EntityRelation r : expected){
 | 
			
		||||
            Assert.assertTrue(relations.contains(r));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        //Test from cache
 | 
			
		||||
        relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get();
 | 
			
		||||
        Assert.assertEquals(expected.size(), relations.size());
 | 
			
		||||
        for(EntityRelation r : expected){
 | 
			
		||||
            Assert.assertTrue(relations.contains(r));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test(expected = DataValidationException.class)
 | 
			
		||||
    public void testSaveRelationWithEmptyFrom() throws ExecutionException, InterruptedException {
 | 
			
		||||
        EntityRelation relation = new EntityRelation();
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user