Major improvement to the relation query
This commit is contained in:
		
							parent
							
								
									582c65b71e
								
							
						
					
					
						commit
						7d73c40885
					
				@ -310,7 +310,9 @@ sql:
 | 
				
			|||||||
      ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size
 | 
					      ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size
 | 
				
			||||||
      checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day
 | 
					      checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day
 | 
				
			||||||
  relations:
 | 
					  relations:
 | 
				
			||||||
    max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible
 | 
					    max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # This value has to be reasonable small to prevent infinite recursion as early as possible
 | 
				
			||||||
 | 
					    pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonable small to prevent relation query blocking all other DB calls
 | 
				
			||||||
 | 
					    query_timeout: "${SQL_RELATIONS_QUERY_TIMEOUT_SEC:20}" # This value has to be reasonable small to prevent relation query blocking all other DB calls
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Actor system parameters
 | 
					# Actor system parameters
 | 
				
			||||||
actors:
 | 
					actors:
 | 
				
			||||||
 | 
				
			|||||||
@ -79,8 +79,6 @@ public interface RelationService {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    ListenableFuture<List<EntityRelation>> findByQuery(TenantId tenantId, EntityRelationsQuery query);
 | 
					    ListenableFuture<List<EntityRelation>> findByQuery(TenantId tenantId, EntityRelationsQuery query);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ListenableFuture<List<EntityRelation>> findByQuery2(TenantId tenantId, EntityRelationsQuery query);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ListenableFuture<List<EntityRelationInfo>> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query);
 | 
					    ListenableFuture<List<EntityRelationInfo>> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void removeRelations(TenantId tenantId, EntityId entityId);
 | 
					    void removeRelations(TenantId tenantId, EntityId entityId);
 | 
				
			||||||
 | 
				
			|||||||
@ -15,6 +15,10 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
package org.thingsboard.common.util;
 | 
					package org.thingsboard.common.util;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.google.common.util.concurrent.MoreExecutors;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import javax.annotation.PostConstruct;
 | 
				
			||||||
 | 
					import javax.annotation.PreDestroy;
 | 
				
			||||||
import java.util.concurrent.ExecutorService;
 | 
					import java.util.concurrent.ExecutorService;
 | 
				
			||||||
import java.util.concurrent.ForkJoinPool;
 | 
					import java.util.concurrent.ForkJoinPool;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -47,4 +51,5 @@ public class ThingsBoardExecutors {
 | 
				
			|||||||
    public static ExecutorService newWorkStealingPool(int parallelism, Class clazz) {
 | 
					    public static ExecutorService newWorkStealingPool(int parallelism, Class clazz) {
 | 
				
			||||||
        return newWorkStealingPool(parallelism, clazz.getSimpleName());
 | 
					        return newWorkStealingPool(parallelism, clazz.getSimpleName());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -5,7 +5,7 @@
 | 
				
			|||||||
 * you may not use this file except in compliance with the License.
 | 
					 * you may not use this file except in compliance with the License.
 | 
				
			||||||
 * You may obtain a copy of the License at
 | 
					 * You may obtain a copy of the License at
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
					 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * Unless required by applicable law or agreed to in writing, software
 | 
					 * Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
					 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
@ -21,9 +21,9 @@ import com.google.common.util.concurrent.Futures;
 | 
				
			|||||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
					import com.google.common.util.concurrent.ListenableFuture;
 | 
				
			||||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
					import com.google.common.util.concurrent.MoreExecutors;
 | 
				
			||||||
import com.google.common.util.concurrent.SettableFuture;
 | 
					import com.google.common.util.concurrent.SettableFuture;
 | 
				
			||||||
import lombok.Getter;
 | 
					 | 
				
			||||||
import lombok.RequiredArgsConstructor;
 | 
					import lombok.RequiredArgsConstructor;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
 | 
					import org.springframework.beans.factory.annotation.Value;
 | 
				
			||||||
import org.springframework.context.ApplicationEventPublisher;
 | 
					import org.springframework.context.ApplicationEventPublisher;
 | 
				
			||||||
import org.springframework.context.annotation.Lazy;
 | 
					import org.springframework.context.annotation.Lazy;
 | 
				
			||||||
import org.springframework.dao.ConcurrencyFailureException;
 | 
					import org.springframework.dao.ConcurrencyFailureException;
 | 
				
			||||||
@ -31,6 +31,7 @@ import org.springframework.stereotype.Service;
 | 
				
			|||||||
import org.springframework.transaction.annotation.Transactional;
 | 
					import org.springframework.transaction.annotation.Transactional;
 | 
				
			||||||
import org.springframework.transaction.event.TransactionalEventListener;
 | 
					import org.springframework.transaction.event.TransactionalEventListener;
 | 
				
			||||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
 | 
					import org.springframework.transaction.support.TransactionSynchronizationManager;
 | 
				
			||||||
 | 
					import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
				
			||||||
import org.thingsboard.server.cache.TbTransactionalCache;
 | 
					import org.thingsboard.server.cache.TbTransactionalCache;
 | 
				
			||||||
import org.thingsboard.server.common.data.StringUtils;
 | 
					import org.thingsboard.server.common.data.StringUtils;
 | 
				
			||||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
					import org.thingsboard.server.common.data.id.EntityId;
 | 
				
			||||||
@ -47,18 +48,23 @@ import org.thingsboard.server.dao.entity.EntityService;
 | 
				
			|||||||
import org.thingsboard.server.dao.exception.DataValidationException;
 | 
					import org.thingsboard.server.dao.exception.DataValidationException;
 | 
				
			||||||
import org.thingsboard.server.dao.service.ConstraintValidator;
 | 
					import org.thingsboard.server.dao.service.ConstraintValidator;
 | 
				
			||||||
import org.thingsboard.server.dao.sql.JpaExecutorService;
 | 
					import org.thingsboard.server.dao.sql.JpaExecutorService;
 | 
				
			||||||
 | 
					import org.thingsboard.server.dao.sql.relation.JpaRelationQueryExecutorService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import javax.annotation.PostConstruct;
 | 
				
			||||||
 | 
					import javax.annotation.PreDestroy;
 | 
				
			||||||
import java.util.ArrayList;
 | 
					import java.util.ArrayList;
 | 
				
			||||||
import java.util.Collections;
 | 
					import java.util.Collections;
 | 
				
			||||||
import java.util.HashSet;
 | 
					import java.util.HashMap;
 | 
				
			||||||
import java.util.List;
 | 
					import java.util.List;
 | 
				
			||||||
 | 
					import java.util.Map;
 | 
				
			||||||
import java.util.Queue;
 | 
					import java.util.Queue;
 | 
				
			||||||
import java.util.Set;
 | 
					import java.util.Set;
 | 
				
			||||||
import java.util.concurrent.ConcurrentHashMap;
 | 
					import java.util.concurrent.ConcurrentHashMap;
 | 
				
			||||||
import java.util.concurrent.ConcurrentLinkedQueue;
 | 
					import java.util.concurrent.ConcurrentLinkedQueue;
 | 
				
			||||||
import java.util.concurrent.LinkedBlockingQueue;
 | 
					import java.util.concurrent.Executors;
 | 
				
			||||||
 | 
					import java.util.concurrent.ScheduledExecutorService;
 | 
				
			||||||
 | 
					import java.util.concurrent.TimeUnit;
 | 
				
			||||||
import java.util.function.BiConsumer;
 | 
					import java.util.function.BiConsumer;
 | 
				
			||||||
import java.util.stream.Collectors;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
import static org.thingsboard.server.dao.service.Validator.validateId;
 | 
					import static org.thingsboard.server.dao.service.Validator.validateId;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -74,15 +80,34 @@ public class BaseRelationService implements RelationService {
 | 
				
			|||||||
    private final TbTransactionalCache<RelationCacheKey, RelationCacheValue> cache;
 | 
					    private final TbTransactionalCache<RelationCacheKey, RelationCacheValue> cache;
 | 
				
			||||||
    private final ApplicationEventPublisher eventPublisher;
 | 
					    private final ApplicationEventPublisher eventPublisher;
 | 
				
			||||||
    private final JpaExecutorService executor;
 | 
					    private final JpaExecutorService executor;
 | 
				
			||||||
 | 
					    private final JpaRelationQueryExecutorService relationsExecutor;
 | 
				
			||||||
 | 
					    protected ScheduledExecutorService timeoutExecutorService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Value("${sql.relations.query_timeout:20}")
 | 
				
			||||||
 | 
					    private Integer relationQueryTimeout;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public BaseRelationService(RelationDao relationDao, @Lazy EntityService entityService,
 | 
					    public BaseRelationService(RelationDao relationDao, @Lazy EntityService entityService,
 | 
				
			||||||
                               TbTransactionalCache<RelationCacheKey, RelationCacheValue> cache,
 | 
					                               TbTransactionalCache<RelationCacheKey, RelationCacheValue> cache,
 | 
				
			||||||
                               ApplicationEventPublisher eventPublisher, JpaExecutorService executor) {
 | 
					                               ApplicationEventPublisher eventPublisher, JpaExecutorService executor,
 | 
				
			||||||
 | 
					                               JpaRelationQueryExecutorService relationsExecutor) {
 | 
				
			||||||
        this.relationDao = relationDao;
 | 
					        this.relationDao = relationDao;
 | 
				
			||||||
        this.entityService = entityService;
 | 
					        this.entityService = entityService;
 | 
				
			||||||
        this.cache = cache;
 | 
					        this.cache = cache;
 | 
				
			||||||
        this.eventPublisher = eventPublisher;
 | 
					        this.eventPublisher = eventPublisher;
 | 
				
			||||||
        this.executor = executor;
 | 
					        this.executor = executor;
 | 
				
			||||||
 | 
					        this.relationsExecutor = relationsExecutor;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @PostConstruct
 | 
				
			||||||
 | 
					    public void init() {
 | 
				
			||||||
 | 
					        timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("relations-query-timeout"));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @PreDestroy
 | 
				
			||||||
 | 
					    public void destroy() {
 | 
				
			||||||
 | 
					        if (timeoutExecutorService != null) {
 | 
				
			||||||
 | 
					            timeoutExecutorService.shutdownNow();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @TransactionalEventListener(classes = EntityRelationEvent.class)
 | 
					    @TransactionalEventListener(classes = EntityRelationEvent.class)
 | 
				
			||||||
@ -382,7 +407,6 @@ public class BaseRelationService implements RelationService {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public ListenableFuture<List<EntityRelation>> findByQuery(TenantId tenantId, EntityRelationsQuery query) {
 | 
					    public ListenableFuture<List<EntityRelation>> findByQuery(TenantId tenantId, EntityRelationsQuery query) {
 | 
				
			||||||
        //boolean fetchLastLevelOnly = true;
 | 
					 | 
				
			||||||
        log.trace("Executing findByQuery [{}]", query);
 | 
					        log.trace("Executing findByQuery [{}]", query);
 | 
				
			||||||
        RelationsSearchParameters params = query.getParameters();
 | 
					        RelationsSearchParameters params = query.getParameters();
 | 
				
			||||||
        final List<RelationEntityTypeFilter> filters = query.getFilters();
 | 
					        final List<RelationEntityTypeFilter> filters = query.getFilters();
 | 
				
			||||||
@ -393,40 +417,8 @@ public class BaseRelationService implements RelationService {
 | 
				
			|||||||
        int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE;
 | 
					        int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
            ListenableFuture<Set<EntityRelation>> relationSet = findRelationsRecursively(tenantId, params.getEntityId(), params.getDirection(), params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>());
 | 
					            ListenableFuture<Set<EntityRelation>> relationSet = findRelationsRecursively(tenantId, params.getEntityId(), params.getDirection(),
 | 
				
			||||||
            return Futures.transform(relationSet, input -> {
 | 
					                    params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>());
 | 
				
			||||||
                List<EntityRelation> relations = new ArrayList<>();
 | 
					 | 
				
			||||||
                if (filters == null || filters.isEmpty()) {
 | 
					 | 
				
			||||||
                    relations.addAll(input);
 | 
					 | 
				
			||||||
                    return relations;
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
                for (EntityRelation relation : input) {
 | 
					 | 
				
			||||||
                    if (matchFilters(filters, relation, params.getDirection())) {
 | 
					 | 
				
			||||||
                        relations.add(relation);
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
                return relations;
 | 
					 | 
				
			||||||
            }, MoreExecutors.directExecutor());
 | 
					 | 
				
			||||||
        } catch (Exception e) {
 | 
					 | 
				
			||||||
            log.warn("Failed to query relations: [{}]", query, e);
 | 
					 | 
				
			||||||
            throw new RuntimeException(e);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @Override
 | 
					 | 
				
			||||||
    public ListenableFuture<List<EntityRelation>> findByQuery2(TenantId tenantId, EntityRelationsQuery query) {
 | 
					 | 
				
			||||||
        //boolean fetchLastLevelOnly = true;
 | 
					 | 
				
			||||||
        log.trace("Executing findByQuery [{}]", query);
 | 
					 | 
				
			||||||
        RelationsSearchParameters params = query.getParameters();
 | 
					 | 
				
			||||||
        final List<RelationEntityTypeFilter> filters = query.getFilters();
 | 
					 | 
				
			||||||
        if (filters == null || filters.isEmpty()) {
 | 
					 | 
				
			||||||
            log.debug("Filters are not set [{}]", query);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        try {
 | 
					 | 
				
			||||||
            ListenableFuture<Set<EntityRelation>> relationSet = findRelationsRecursively2(tenantId, params.getEntityId(), params.getDirection(), params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>());
 | 
					 | 
				
			||||||
            return Futures.transform(relationSet, input -> {
 | 
					            return Futures.transform(relationSet, input -> {
 | 
				
			||||||
                List<EntityRelation> relations = new ArrayList<>();
 | 
					                List<EntityRelation> relations = new ArrayList<>();
 | 
				
			||||||
                if (filters == null || filters.isEmpty()) {
 | 
					                if (filters == null || filters.isEmpty()) {
 | 
				
			||||||
@ -560,7 +552,7 @@ public class BaseRelationService implements RelationService {
 | 
				
			|||||||
        final EntitySearchDirection direction;
 | 
					        final EntitySearchDirection direction;
 | 
				
			||||||
        final RelationTypeGroup relationTypeGroup;
 | 
					        final RelationTypeGroup relationTypeGroup;
 | 
				
			||||||
        final boolean fetchLastLevelOnly;
 | 
					        final boolean fetchLastLevelOnly;
 | 
				
			||||||
        final int lvl;
 | 
					        final int maxLvl;
 | 
				
			||||||
        final ConcurrentHashMap<EntityId, Boolean> uniqueMap;
 | 
					        final ConcurrentHashMap<EntityId, Boolean> uniqueMap;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -569,29 +561,43 @@ public class BaseRelationService implements RelationService {
 | 
				
			|||||||
    private static class RelationTask {
 | 
					    private static class RelationTask {
 | 
				
			||||||
        private final int currentLvl;
 | 
					        private final int currentLvl;
 | 
				
			||||||
        private final EntityId root;
 | 
					        private final EntityId root;
 | 
				
			||||||
 | 
					        private final List<EntityRelation> prevRelations;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void processQueue(RelationQueueCtx ctx) {
 | 
					    private void processQueue(RelationQueueCtx ctx) {
 | 
				
			||||||
        RelationTask task = ctx.tasks.poll();
 | 
					        RelationTask task = ctx.tasks.poll();
 | 
				
			||||||
        while (task != null) {
 | 
					        while (task != null) {
 | 
				
			||||||
            List<EntityRelation> relations = findRelations(ctx.tenantId, task.root, ctx.direction, ctx.relationTypeGroup);
 | 
					            List<EntityRelation> relations = findRelations(ctx.tenantId, task.root, ctx.direction, ctx.relationTypeGroup);
 | 
				
			||||||
            Set<EntityId> childrenIds = new HashSet<>();
 | 
					            Map<EntityId, List<EntityRelation>> newChildrenRelations = new HashMap<>();
 | 
				
			||||||
            for (EntityRelation childRelation : relations) {
 | 
					            for (EntityRelation childRelation : relations) {
 | 
				
			||||||
                log.trace("Found Relation: {}", childRelation);
 | 
					                log.trace("Found Relation: {}", childRelation);
 | 
				
			||||||
                EntityId childId = ctx.direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom();
 | 
					                EntityId childId = ctx.direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom();
 | 
				
			||||||
                if (ctx.uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) {
 | 
					                if (ctx.uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) {
 | 
				
			||||||
                    log.trace("Adding Relation: {}", childId);
 | 
					                    log.trace("Adding Relation: {}", childId);
 | 
				
			||||||
                    if (childrenIds.add(childId)) {
 | 
					                    newChildrenRelations.put(childId, new ArrayList<>());
 | 
				
			||||||
                        log.trace("Added Relation: {}", childId);
 | 
					                }
 | 
				
			||||||
 | 
					                if (ctx.fetchLastLevelOnly) {
 | 
				
			||||||
 | 
					                    var list = newChildrenRelations.get(childId);
 | 
				
			||||||
 | 
					                    if (list != null) {
 | 
				
			||||||
 | 
					                        list.add(childRelation);
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            for (EntityId child : childrenIds) {
 | 
					            if (ctx.fetchLastLevelOnly) {
 | 
				
			||||||
                ctx.tasks.add(new RelationTask(task.currentLvl - 1, child));
 | 
					                if (relations.isEmpty()) {
 | 
				
			||||||
                if (!ctx.fetchLastLevelOnly || task.currentLvl == 0) {
 | 
					                    ctx.result.addAll(task.prevRelations);
 | 
				
			||||||
 | 
					                } else if (task.currentLvl == ctx.maxLvl) {
 | 
				
			||||||
                    ctx.result.addAll(relations);
 | 
					                    ctx.result.addAll(relations);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                ctx.result.addAll(relations);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					            var finalTask = task;
 | 
				
			||||||
 | 
					            newChildrenRelations.forEach((child, childRelations) -> {
 | 
				
			||||||
 | 
					                var newLvl = finalTask.currentLvl + 1;
 | 
				
			||||||
 | 
					                if (newLvl <= ctx.maxLvl)
 | 
				
			||||||
 | 
					                    ctx.tasks.add(new RelationTask(newLvl, child, childRelations));
 | 
				
			||||||
 | 
					            });
 | 
				
			||||||
            task = ctx.tasks.poll();
 | 
					            task = ctx.tasks.poll();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        ctx.future.set(ctx.result);
 | 
					        ctx.future.set(ctx.result);
 | 
				
			||||||
@ -604,52 +610,12 @@ public class BaseRelationService implements RelationService {
 | 
				
			|||||||
            return Futures.immediateFuture(Collections.emptySet());
 | 
					            return Futures.immediateFuture(Collections.emptySet());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        var relationQueueCtx = new RelationQueueCtx(tenantId, direction, relationTypeGroup, fetchLastLevelOnly, lvl, uniqueMap);
 | 
					        var relationQueueCtx = new RelationQueueCtx(tenantId, direction, relationTypeGroup, fetchLastLevelOnly, lvl, uniqueMap);
 | 
				
			||||||
        relationQueueCtx.tasks.add(new RelationTask(lvl, rootId));
 | 
					        relationQueueCtx.tasks.add(new RelationTask(1, rootId, Collections.emptyList()));
 | 
				
			||||||
        executor.submit(() -> processQueue(relationQueueCtx));
 | 
					        relationsExecutor.submit(() -> processQueue(relationQueueCtx));
 | 
				
			||||||
        return relationQueueCtx.future;
 | 
					        return Futures.withTimeout(relationQueueCtx.future, relationQueryTimeout, TimeUnit.SECONDS, timeoutExecutorService);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private ListenableFuture<Set<EntityRelation>> findRelationsRecursively2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction,
 | 
					 | 
				
			||||||
                                                                           RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly,
 | 
					 | 
				
			||||||
                                                                           final ConcurrentHashMap<EntityId, Boolean> uniqueMap) {
 | 
					 | 
				
			||||||
        if (lvl == 0) {
 | 
					 | 
				
			||||||
            return Futures.immediateFuture(Collections.emptySet());
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        final int currentLvl = --lvl;
 | 
					 | 
				
			||||||
        final Set<EntityRelation> children = new HashSet<>();
 | 
					 | 
				
			||||||
        ListenableFuture<List<EntityRelation>> rootRelationsFuture = findRelations2(tenantId, rootId, direction, relationTypeGroup);
 | 
					 | 
				
			||||||
        ListenableFuture<Set<EntityId>> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> {
 | 
					 | 
				
			||||||
            children.addAll(relations);
 | 
					 | 
				
			||||||
            Set<EntityId> childrenIds = new HashSet<>();
 | 
					 | 
				
			||||||
            for (EntityRelation childRelation : children) {
 | 
					 | 
				
			||||||
                log.trace("Found Relation: {}", childRelation);
 | 
					 | 
				
			||||||
                EntityId childId = direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom();
 | 
					 | 
				
			||||||
                if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) {
 | 
					 | 
				
			||||||
                    log.trace("Adding Relation: {}", childId);
 | 
					 | 
				
			||||||
                    if (childrenIds.add(childId)) {
 | 
					 | 
				
			||||||
                        log.trace("Added Relation: {}", childId);
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            return childrenIds;
 | 
					 | 
				
			||||||
        }, MoreExecutors.directExecutor());
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        ListenableFuture<List<Set<EntityRelation>>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds ->
 | 
					 | 
				
			||||||
                Futures.successfulAsList(childrenIds.stream()
 | 
					 | 
				
			||||||
                        .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap))
 | 
					 | 
				
			||||||
                        .collect(Collectors.toList())), executor);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        ListenableFuture<Set<EntityRelation>> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> {
 | 
					 | 
				
			||||||
            if (fetchLastLevelOnly && currentLvl > 0) {
 | 
					 | 
				
			||||||
                children.clear();
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            recursiveRelations.forEach(children::addAll);
 | 
					 | 
				
			||||||
            return children;
 | 
					 | 
				
			||||||
        }, executor);
 | 
					 | 
				
			||||||
        return relationsFuture;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    private List<EntityRelation> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {
 | 
					    private List<EntityRelation> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {
 | 
				
			||||||
        List<EntityRelation> relations;
 | 
					        List<EntityRelation> relations;
 | 
				
			||||||
        if (relationTypeGroup == null) {
 | 
					        if (relationTypeGroup == null) {
 | 
				
			||||||
@ -663,19 +629,6 @@ public class BaseRelationService implements RelationService {
 | 
				
			|||||||
        return relations;
 | 
					        return relations;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private ListenableFuture<List<EntityRelation>> findRelations2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) {
 | 
					 | 
				
			||||||
        ListenableFuture<List<EntityRelation>> relations;
 | 
					 | 
				
			||||||
        if (relationTypeGroup == null) {
 | 
					 | 
				
			||||||
            relationTypeGroup = RelationTypeGroup.COMMON;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        if (direction == EntitySearchDirection.FROM) {
 | 
					 | 
				
			||||||
            relations = findByFromAsync(tenantId, rootId, relationTypeGroup);
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            relations = findByToAsync(tenantId, rootId, relationTypeGroup);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        return relations;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    private void publishEvictEvent(EntityRelationEvent event) {
 | 
					    private void publishEvictEvent(EntityRelationEvent event) {
 | 
				
			||||||
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
 | 
					        if (TransactionSynchronizationManager.isActualTransactionActive()) {
 | 
				
			||||||
            eventPublisher.publishEvent(event);
 | 
					            eventPublisher.publishEvent(event);
 | 
				
			||||||
 | 
				
			|||||||
@ -0,0 +1,33 @@
 | 
				
			|||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Copyright © 2016-2022 The Thingsboard Authors
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					 * you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					 * You may obtain a copy of the License at
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					 * See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					 * limitations under the License.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					package org.thingsboard.server.dao.sql.relation;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.springframework.beans.factory.annotation.Value;
 | 
				
			||||||
 | 
					import org.springframework.stereotype.Component;
 | 
				
			||||||
 | 
					import org.thingsboard.common.util.AbstractListeningExecutor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Component
 | 
				
			||||||
 | 
					public class JpaRelationQueryExecutorService extends AbstractListeningExecutor {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Value("${sql.relations.pool_size:4}")
 | 
				
			||||||
 | 
					    private int poolSize;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    protected int getThreadPollSize() {
 | 
				
			||||||
 | 
					        return poolSize;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -35,6 +35,7 @@ import org.thingsboard.server.dao.exception.DataValidationException;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import java.util.ArrayList;
 | 
					import java.util.ArrayList;
 | 
				
			||||||
import java.util.Collections;
 | 
					import java.util.Collections;
 | 
				
			||||||
 | 
					import java.util.LinkedList;
 | 
				
			||||||
import java.util.List;
 | 
					import java.util.List;
 | 
				
			||||||
import java.util.concurrent.ExecutionException;
 | 
					import java.util.concurrent.ExecutionException;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -171,8 +172,8 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
 | 
				
			|||||||
        Assert.assertEquals(0, relations.size());
 | 
					        Assert.assertEquals(0, relations.size());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private Boolean saveRelation(EntityRelation relationA1) throws ExecutionException, InterruptedException {
 | 
					    private Boolean saveRelation(EntityRelation relationA1) {
 | 
				
			||||||
        return relationService.saveRelationAsync(SYSTEM_TENANT_ID, relationA1).get();
 | 
					        return relationService.saveRelation(SYSTEM_TENANT_ID, relationA1);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
@ -194,9 +195,6 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
 | 
				
			|||||||
        saveRelation(relationB1);
 | 
					        saveRelation(relationB1);
 | 
				
			||||||
        saveRelation(relationB2);
 | 
					        saveRelation(relationB2);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Data propagation to views is async
 | 
					 | 
				
			||||||
        Thread.sleep(3000);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        List<EntityRelation> relations = relationService.findByTo(SYSTEM_TENANT_ID, childA, RelationTypeGroup.COMMON);
 | 
					        List<EntityRelation> relations = relationService.findByTo(SYSTEM_TENANT_ID, childA, RelationTypeGroup.COMMON);
 | 
				
			||||||
        Assert.assertEquals(2, relations.size());
 | 
					        Assert.assertEquals(2, relations.size());
 | 
				
			||||||
        for (EntityRelation relation : relations) {
 | 
					        for (EntityRelation relation : relations) {
 | 
				
			||||||
@ -289,7 +287,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
    public void testRecursiveRelationDepth3() throws ExecutionException, InterruptedException {
 | 
					    public void testRecursiveRelationDepth() throws ExecutionException, InterruptedException {
 | 
				
			||||||
        int maxLevel = 1000;
 | 
					        int maxLevel = 1000;
 | 
				
			||||||
        AssetId root = new AssetId(Uuids.timeBased());
 | 
					        AssetId root = new AssetId(Uuids.timeBased());
 | 
				
			||||||
        AssetId left = new AssetId(Uuids.timeBased());
 | 
					        AssetId left = new AssetId(Uuids.timeBased());
 | 
				
			||||||
@ -336,54 +334,6 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					 | 
				
			||||||
    public void testRecursiveRelationDepth2() throws ExecutionException, InterruptedException {
 | 
					 | 
				
			||||||
        int maxLevel = 1000;
 | 
					 | 
				
			||||||
        AssetId root = new AssetId(Uuids.timeBased());
 | 
					 | 
				
			||||||
        AssetId left = new AssetId(Uuids.timeBased());
 | 
					 | 
				
			||||||
        AssetId right = new AssetId(Uuids.timeBased());
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        List<EntityRelation> expected = new ArrayList<>();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        EntityRelation relationAB = new EntityRelation(root, left, EntityRelation.CONTAINS_TYPE);
 | 
					 | 
				
			||||||
        EntityRelation relationBC = new EntityRelation(root, right, EntityRelation.CONTAINS_TYPE);
 | 
					 | 
				
			||||||
        saveRelation(relationAB);
 | 
					 | 
				
			||||||
        expected.add(relationAB);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        saveRelation(relationBC);
 | 
					 | 
				
			||||||
        expected.add(relationBC);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for (int i = 0; i < maxLevel; i++) {
 | 
					 | 
				
			||||||
            var newLeft = new AssetId(Uuids.timeBased());
 | 
					 | 
				
			||||||
            var newRight = new AssetId(Uuids.timeBased());
 | 
					 | 
				
			||||||
            EntityRelation relationLeft = new EntityRelation(left, newLeft, EntityRelation.CONTAINS_TYPE);
 | 
					 | 
				
			||||||
            EntityRelation relationRight = new EntityRelation(right, newRight, EntityRelation.CONTAINS_TYPE);
 | 
					 | 
				
			||||||
            saveRelation(relationLeft);
 | 
					 | 
				
			||||||
            expected.add(relationLeft);
 | 
					 | 
				
			||||||
            saveRelation(relationRight);
 | 
					 | 
				
			||||||
            expected.add(relationRight);
 | 
					 | 
				
			||||||
            left = newLeft;
 | 
					 | 
				
			||||||
            right = newRight;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
					 | 
				
			||||||
        query.setParameters(new RelationsSearchParameters(root, EntitySearchDirection.FROM, -1, false));
 | 
					 | 
				
			||||||
        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
					 | 
				
			||||||
        List<EntityRelation> relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get();
 | 
					 | 
				
			||||||
        Assert.assertEquals(expected.size(), relations.size());
 | 
					 | 
				
			||||||
        for(EntityRelation r : expected){
 | 
					 | 
				
			||||||
            Assert.assertTrue(relations.contains(r));
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        //Test from cache
 | 
					 | 
				
			||||||
        relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get();
 | 
					 | 
				
			||||||
        Assert.assertEquals(expected.size(), relations.size());
 | 
					 | 
				
			||||||
        for(EntityRelation r : expected){
 | 
					 | 
				
			||||||
            Assert.assertTrue(relations.contains(r));
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @Test(expected = DataValidationException.class)
 | 
					    @Test(expected = DataValidationException.class)
 | 
				
			||||||
    public void testSaveRelationWithEmptyFrom() throws ExecutionException, InterruptedException {
 | 
					    public void testSaveRelationWithEmptyFrom() throws ExecutionException, InterruptedException {
 | 
				
			||||||
        EntityRelation relation = new EntityRelation();
 | 
					        EntityRelation relation = new EntityRelation();
 | 
				
			||||||
@ -407,4 +357,290 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
 | 
				
			|||||||
        relation.setTo(new AssetId(Uuids.timeBased()));
 | 
					        relation.setTo(new AssetId(Uuids.timeBased()));
 | 
				
			||||||
        Assert.assertTrue(saveRelation(relation));
 | 
					        Assert.assertTrue(saveRelation(relation));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testFindByQueryFetchLastOnlyTreeLike() throws Exception {
 | 
				
			||||||
 | 
					        // A -> B
 | 
				
			||||||
 | 
					        // A -> C
 | 
				
			||||||
 | 
					        // C -> D
 | 
				
			||||||
 | 
					        // C -> E
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        AssetId assetA = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetB = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetC = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetD = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetE = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        saveRelation(relationA);
 | 
				
			||||||
 | 
					        saveRelation(relationB);
 | 
				
			||||||
 | 
					        saveRelation(relationC);
 | 
				
			||||||
 | 
					        saveRelation(relationD);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
				
			||||||
 | 
					        query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, true));
 | 
				
			||||||
 | 
					        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
				
			||||||
 | 
					        List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertEquals(3, relations.size());
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationB));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        //Test from cache
 | 
				
			||||||
 | 
					        relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationB));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testFindByQueryFetchLastOnlySingleLinked() throws Exception {
 | 
				
			||||||
 | 
					        // A -> B -> C -> D
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        AssetId assetA = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetB = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetC = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetD = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationB = new EntityRelation(assetB, assetC, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        saveRelation(relationA);
 | 
				
			||||||
 | 
					        saveRelation(relationB);
 | 
				
			||||||
 | 
					        saveRelation(relationC);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
				
			||||||
 | 
					        query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, true));
 | 
				
			||||||
 | 
					        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
				
			||||||
 | 
					        List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertEquals(1, relations.size());
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationB));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        //Test from cache
 | 
				
			||||||
 | 
					        relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationB));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testFindByQueryFetchLastOnlyTreeLikeWithMaxLvl() throws Exception {
 | 
				
			||||||
 | 
					        // A -> B   A
 | 
				
			||||||
 | 
					        // A -> C   B
 | 
				
			||||||
 | 
					        // C -> D   C
 | 
				
			||||||
 | 
					        // C -> E   D
 | 
				
			||||||
 | 
					        // D -> F   E
 | 
				
			||||||
 | 
					        // D -> G   F
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        AssetId assetA = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetB = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetC = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetD = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetE = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetF = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetG = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationE = new EntityRelation(assetD, assetF, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationF = new EntityRelation(assetD, assetG, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        saveRelation(relationA);
 | 
				
			||||||
 | 
					        saveRelation(relationB);
 | 
				
			||||||
 | 
					        saveRelation(relationC);
 | 
				
			||||||
 | 
					        saveRelation(relationD);
 | 
				
			||||||
 | 
					        saveRelation(relationE);
 | 
				
			||||||
 | 
					        saveRelation(relationF);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
				
			||||||
 | 
					        query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, 2, true));
 | 
				
			||||||
 | 
					        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
				
			||||||
 | 
					        List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertEquals(3, relations.size());
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationB));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationE));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationF));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        //Test from cache
 | 
				
			||||||
 | 
					        relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationB));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationE));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationF));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testFindByQueryTreeLikeWithMaxLvl() throws Exception {
 | 
				
			||||||
 | 
					        // A -> B   A
 | 
				
			||||||
 | 
					        // A -> C   B
 | 
				
			||||||
 | 
					        // C -> D   C
 | 
				
			||||||
 | 
					        // C -> E   D
 | 
				
			||||||
 | 
					        // D -> F   E
 | 
				
			||||||
 | 
					        // D -> G   F
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        AssetId assetA = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetB = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetC = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetD = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetE = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetF = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetG = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationE = new EntityRelation(assetD, assetF, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationF = new EntityRelation(assetD, assetG, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        saveRelation(relationA);
 | 
				
			||||||
 | 
					        saveRelation(relationB);
 | 
				
			||||||
 | 
					        saveRelation(relationC);
 | 
				
			||||||
 | 
					        saveRelation(relationD);
 | 
				
			||||||
 | 
					        saveRelation(relationE);
 | 
				
			||||||
 | 
					        saveRelation(relationF);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
				
			||||||
 | 
					        query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, 2, false));
 | 
				
			||||||
 | 
					        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
				
			||||||
 | 
					        List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertEquals(4, relations.size());
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationB));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationE));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationF));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        //Test from cache
 | 
				
			||||||
 | 
					        relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationB));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationE));
 | 
				
			||||||
 | 
					        Assert.assertFalse(relations.contains(relationF));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testFindByQueryTreeLikeWithUnlimLvl() throws Exception {
 | 
				
			||||||
 | 
					        // A -> B   A
 | 
				
			||||||
 | 
					        // A -> C   B
 | 
				
			||||||
 | 
					        // C -> D   C
 | 
				
			||||||
 | 
					        // C -> E   D
 | 
				
			||||||
 | 
					        // D -> F   E
 | 
				
			||||||
 | 
					        // D -> G   F
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        AssetId assetA = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetB = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetC = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetD = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetE = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetF = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId assetG = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationE = new EntityRelation(assetD, assetF, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation relationF = new EntityRelation(assetD, assetG, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        saveRelation(relationA);
 | 
				
			||||||
 | 
					        saveRelation(relationB);
 | 
				
			||||||
 | 
					        saveRelation(relationC);
 | 
				
			||||||
 | 
					        saveRelation(relationD);
 | 
				
			||||||
 | 
					        saveRelation(relationE);
 | 
				
			||||||
 | 
					        saveRelation(relationF);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
				
			||||||
 | 
					        query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, false));
 | 
				
			||||||
 | 
					        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
				
			||||||
 | 
					        List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertEquals(6, relations.size());
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationB));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationE));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationF));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        //Test from cache
 | 
				
			||||||
 | 
					        relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationA));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationB));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationC));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationD));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationE));
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.contains(relationF));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testFindByQueryLargeHierarchyFetchAllWithUnlimLvl() throws Exception {
 | 
				
			||||||
 | 
					        AssetId rootAsset = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        final int hierarchyLvl = 10;
 | 
				
			||||||
 | 
					        List<EntityRelation> expectedRelations = new LinkedList<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        createAssetRelationsRecursively(rootAsset, hierarchyLvl, expectedRelations, false);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
				
			||||||
 | 
					        query.setParameters(new RelationsSearchParameters(rootAsset, EntitySearchDirection.FROM, -1, false));
 | 
				
			||||||
 | 
					        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
				
			||||||
 | 
					        List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertEquals(expectedRelations.size(), relations.size());
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.containsAll(expectedRelations));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testFindByQueryLargeHierarchyFetchLastOnlyWithUnlimLvl() throws Exception {
 | 
				
			||||||
 | 
					        AssetId rootAsset = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        final int hierarchyLvl = 10;
 | 
				
			||||||
 | 
					        List<EntityRelation> expectedRelations = new LinkedList<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        createAssetRelationsRecursively(rootAsset, hierarchyLvl, expectedRelations, true);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelationsQuery query = new EntityRelationsQuery();
 | 
				
			||||||
 | 
					        query.setParameters(new RelationsSearchParameters(rootAsset, EntitySearchDirection.FROM, -1, true));
 | 
				
			||||||
 | 
					        query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
 | 
				
			||||||
 | 
					        List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
 | 
				
			||||||
 | 
					        Assert.assertEquals(expectedRelations.size(), relations.size());
 | 
				
			||||||
 | 
					        Assert.assertTrue(relations.containsAll(expectedRelations));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private void createAssetRelationsRecursively(AssetId rootAsset, int lvl, List<EntityRelation> entityRelations, boolean lastLvlOnly) throws Exception {
 | 
				
			||||||
 | 
					        if (lvl == 0) return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        AssetId firstAsset = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					        AssetId secondAsset = new AssetId(Uuids.timeBased());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        EntityRelation firstRelation = new EntityRelation(rootAsset, firstAsset, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					        EntityRelation secondRelation = new EntityRelation(rootAsset, secondAsset, EntityRelation.CONTAINS_TYPE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        saveRelation(firstRelation);
 | 
				
			||||||
 | 
					        saveRelation(secondRelation);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (!lastLvlOnly || lvl == 1) {
 | 
				
			||||||
 | 
					            entityRelations.add(firstRelation);
 | 
				
			||||||
 | 
					            entityRelations.add(secondRelation);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        createAssetRelationsRecursively(firstAsset, lvl - 1, entityRelations, lastLvlOnly);
 | 
				
			||||||
 | 
					        createAssetRelationsRecursively(secondAsset, lvl - 1, entityRelations, lastLvlOnly);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user