diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java index 5c06183106..28eafabfce 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.action; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -69,41 +70,31 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity, String relationType) { - ListenableFuture future = createIfAbsent(ctx, msg, entity, relationType); + ListenableFuture future = createRelationIfAbsent(ctx, msg, entity, relationType); return Futures.transform(future, result -> { - RelationContainer container = new RelationContainer(); if (result && config.isChangeOriginatorToRelatedEntity()) { TbMsg tbMsg = ctx.transformMsg(msg, msg.getType(), entity.getEntityId(), msg.getMetaData(), msg.getData()); - container.setMsg(tbMsg); - } else { - container.setMsg(msg); + return new RelationContainer(tbMsg, result); } - container.setResult(result); - return container; + return new RelationContainer(msg, result); }, ctx.getDbCallbackExecutor()); } - private ListenableFuture createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) { + private ListenableFuture createRelationIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) { SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer); - ListenableFuture checkRelationFuture = Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON), result -> { - if (!result) { - if (config.isRemoveCurrentRelations()) { - return processDeleteRelations(ctx, processFindRelations(ctx, msg, sdId, relationType)); - } - return Futures.immediateFuture(false); - } - return Futures.immediateFuture(true); - }, ctx.getDbCallbackExecutor()); - - return Futures.transformAsync(checkRelationFuture, result -> { - if (!result) { - return processCreateRelation(ctx, entityContainer, sdId, relationType); - } - return Futures.immediateFuture(true); - }, ctx.getDbCallbackExecutor()); + return Futures.transformAsync(deleteCurrentRelationsIfNeeded(ctx, msg, sdId, relationType), v -> + checkRelationAndCreateIfAbsent(ctx, entityContainer, relationType, sdId), + ctx.getDbCallbackExecutor()); } - private ListenableFuture> processFindRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) { + private ListenableFuture deleteCurrentRelationsIfNeeded(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) { + if (config.isRemoveCurrentRelations()) { + return deleteOriginatorRelations(ctx, findOriginatorRelations(ctx, msg, sdId, relationType)); + } + return Futures.immediateFuture(null); + } + + private ListenableFuture> findOriginatorRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) { if (sdId.isOriginatorDirectionFrom()) { return ctx.getRelationService().findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON); } else { @@ -111,19 +102,31 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode processDeleteRelations(TbContext ctx, ListenableFuture> listListenableFuture) { - return Futures.transformAsync(listListenableFuture, entityRelations -> { - if (!entityRelations.isEmpty()) { - List> list = new ArrayList<>(); - for (EntityRelation relation : entityRelations) { + private ListenableFuture deleteOriginatorRelations(TbContext ctx, ListenableFuture> originatorRelationsFuture) { + return Futures.transformAsync(originatorRelationsFuture, originatorRelations -> { + List> list = new ArrayList<>(); + if (!CollectionUtils.isEmpty(originatorRelations)) { + for (EntityRelation relation : originatorRelations) { list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation)); } - return Futures.transform(Futures.allAsList(list), result -> false, ctx.getDbCallbackExecutor()); } - return Futures.immediateFuture(false); + return Futures.transform(Futures.allAsList(list), result -> null, ctx.getDbCallbackExecutor()); }, ctx.getDbCallbackExecutor()); } + private ListenableFuture checkRelationAndCreateIfAbsent(TbContext ctx, EntityContainer entityContainer, String relationType, SearchDirectionIds sdId) { + return Futures.transformAsync(checkRelation(ctx, sdId, relationType), relationPresent -> { + if (relationPresent) { + return Futures.immediateFuture(true); + } + return processCreateRelation(ctx, entityContainer, sdId, relationType); + }, ctx.getDbCallbackExecutor()); + } + + private ListenableFuture checkRelation(TbContext ctx, SearchDirectionIds sdId, String relationType) { + return ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON); + } + private ListenableFuture processCreateRelation(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) { switch (entityContainer.getEntityType()) { case ASSET: