diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java index e7df5eea93..35ca4f9ad1 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java @@ -79,6 +79,8 @@ public interface RelationService { ListenableFuture> findByQuery(TenantId tenantId, EntityRelationsQuery query); + ListenableFuture> findByQuery2(TenantId tenantId, EntityRelationsQuery query); + ListenableFuture> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query); void removeRelations(TenantId tenantId, EntityId entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index c751f7334b..39dd207527 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,6 +20,9 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; @@ -49,8 +52,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -407,6 +413,39 @@ public class BaseRelationService implements RelationService { } } + @Override + public ListenableFuture> findByQuery2(TenantId tenantId, EntityRelationsQuery query) { + //boolean fetchLastLevelOnly = true; + log.trace("Executing findByQuery [{}]", query); + RelationsSearchParameters params = query.getParameters(); + final List filters = query.getFilters(); + if (filters == null || filters.isEmpty()) { + log.debug("Filters are not set [{}]", query); + } + + int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE; + + try { + ListenableFuture> relationSet = findRelationsRecursively2(tenantId, params.getEntityId(), params.getDirection(), params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>()); + return Futures.transform(relationSet, input -> { + List relations = new ArrayList<>(); + if (filters == null || filters.isEmpty()) { + relations.addAll(input); + return relations; + } + for (EntityRelation relation : input) { + if (matchFilters(filters, relation, params.getDirection())) { + relations.add(relation); + } + } + return relations; + }, MoreExecutors.directExecutor()); + } catch (Exception e) { + log.warn("Failed to query relations: [{}]", query, e); + throw new RuntimeException(e); + } + } + @Override public ListenableFuture> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query) { log.trace("Executing findInfoByQuery [{}]", query); @@ -511,15 +550,75 @@ public class BaseRelationService implements RelationService { } } + @RequiredArgsConstructor + private static class RelationQueueCtx { + final SettableFuture> future = SettableFuture.create(); + final Set result = ConcurrentHashMap.newKeySet(); + final Queue tasks = new ConcurrentLinkedQueue<>(); + + final TenantId tenantId; + final EntitySearchDirection direction; + final RelationTypeGroup relationTypeGroup; + final boolean fetchLastLevelOnly; + final int lvl; + final ConcurrentHashMap uniqueMap; + + } + + @RequiredArgsConstructor + private static class RelationTask { + private final int currentLvl; + private final EntityId root; + } + + private void processQueue(RelationQueueCtx ctx) { + RelationTask task = ctx.tasks.poll(); + while (task != null) { + List relations = findRelations(ctx.tenantId, task.root, ctx.direction, ctx.relationTypeGroup); + Set childrenIds = new HashSet<>(); + for (EntityRelation childRelation : relations) { + log.trace("Found Relation: {}", childRelation); + EntityId childId = ctx.direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom(); + if (ctx.uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { + log.trace("Adding Relation: {}", childId); + if (childrenIds.add(childId)) { + log.trace("Added Relation: {}", childId); + } + } + } + for (EntityId child : childrenIds) { + ctx.tasks.add(new RelationTask(task.currentLvl - 1, child)); + if (!ctx.fetchLastLevelOnly || task.currentLvl == 0) { + ctx.result.addAll(relations); + } + } + task = ctx.tasks.poll(); + } + ctx.future.set(ctx.result); + } + private ListenableFuture> findRelationsRecursively(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, final ConcurrentHashMap uniqueMap) { if (lvl == 0) { return Futures.immediateFuture(Collections.emptySet()); } + var relationQueueCtx = new RelationQueueCtx(tenantId, direction, relationTypeGroup, fetchLastLevelOnly, lvl, uniqueMap); + relationQueueCtx.tasks.add(new RelationTask(lvl, rootId)); + executor.submit(() -> processQueue(relationQueueCtx)); + return relationQueueCtx.future; + } + + + private ListenableFuture> findRelationsRecursively2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, + RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, + final ConcurrentHashMap uniqueMap) { + if (lvl == 0) { + return Futures.immediateFuture(Collections.emptySet()); + } final int currentLvl = --lvl; final Set children = new HashSet<>(); - ListenableFuture> rootRelationsFuture = findRelations(tenantId, rootId, direction, relationTypeGroup); + ListenableFuture> rootRelationsFuture = findRelations2(tenantId, rootId, direction, relationTypeGroup); ListenableFuture> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> { children.addAll(relations); Set childrenIds = new HashSet<>(); @@ -539,7 +638,7 @@ public class BaseRelationService implements RelationService { ListenableFuture>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds -> Futures.successfulAsList(childrenIds.stream() .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap)) - .collect(Collectors.toList())), MoreExecutors.directExecutor()); + .collect(Collectors.toList())), executor); ListenableFuture> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> { if (fetchLastLevelOnly && currentLvl > 0) { @@ -547,11 +646,24 @@ public class BaseRelationService implements RelationService { } recursiveRelations.forEach(children::addAll); return children; - }, MoreExecutors.directExecutor()); + }, executor); return relationsFuture; } - private ListenableFuture> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { + private List findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { + List relations; + if (relationTypeGroup == null) { + relationTypeGroup = RelationTypeGroup.COMMON; + } + if (direction == EntitySearchDirection.FROM) { + relations = findByFrom(tenantId, rootId, relationTypeGroup); + } else { + relations = findByTo(tenantId, rootId, relationTypeGroup); + } + return relations; + } + + private ListenableFuture> findRelations2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { ListenableFuture> relations; if (relationTypeGroup == null) { relationTypeGroup = RelationTypeGroup.COMMON; diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java index ca85c88337..a5b27bed32 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java @@ -289,8 +289,8 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { } @Test - public void testRecursiveRelationDepth() throws ExecutionException, InterruptedException { - int maxLevel = 100; + public void testRecursiveRelationDepth3() throws ExecutionException, InterruptedException { + int maxLevel = 1000; AssetId root = new AssetId(Uuids.timeBased()); AssetId left = new AssetId(Uuids.timeBased()); AssetId right = new AssetId(Uuids.timeBased()); @@ -336,6 +336,54 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { } } + @Test + public void testRecursiveRelationDepth2() throws ExecutionException, InterruptedException { + int maxLevel = 1000; + AssetId root = new AssetId(Uuids.timeBased()); + AssetId left = new AssetId(Uuids.timeBased()); + AssetId right = new AssetId(Uuids.timeBased()); + + List expected = new ArrayList<>(); + + EntityRelation relationAB = new EntityRelation(root, left, EntityRelation.CONTAINS_TYPE); + EntityRelation relationBC = new EntityRelation(root, right, EntityRelation.CONTAINS_TYPE); + saveRelation(relationAB); + expected.add(relationAB); + + saveRelation(relationBC); + expected.add(relationBC); + + for (int i = 0; i < maxLevel; i++) { + var newLeft = new AssetId(Uuids.timeBased()); + var newRight = new AssetId(Uuids.timeBased()); + EntityRelation relationLeft = new EntityRelation(left, newLeft, EntityRelation.CONTAINS_TYPE); + EntityRelation relationRight = new EntityRelation(right, newRight, EntityRelation.CONTAINS_TYPE); + saveRelation(relationLeft); + expected.add(relationLeft); + saveRelation(relationRight); + expected.add(relationRight); + left = newLeft; + right = newRight; + } + + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(root, EntitySearchDirection.FROM, -1, false)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expected.size(), relations.size()); + for(EntityRelation r : expected){ + Assert.assertTrue(relations.contains(r)); + } + + //Test from cache + relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expected.size(), relations.size()); + for(EntityRelation r : expected){ + Assert.assertTrue(relations.contains(r)); + } + } + @Test(expected = DataValidationException.class) public void testSaveRelationWithEmptyFrom() throws ExecutionException, InterruptedException { EntityRelation relation = new EntityRelation();