From f87aa0b1425f03ce9ec8ce64b34a7ad6b0ff5466 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 28 Jan 2025 14:35:21 +0100 Subject: [PATCH 1/6] Added new relation between rule chains if RuleChainInput node is used --- .../common/data/relation/EntityRelation.java | 1 + .../server/dao/rule/BaseRuleChainService.java | 35 +++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java index 62b5caafd3..ff45c01500 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java @@ -41,6 +41,7 @@ public class EntityRelation implements HasVersion, Serializable { public static final String EDGE_TYPE = "ManagedByEdge"; public static final String CONTAINS_TYPE = "Contains"; public static final String MANAGES_TYPE = "Manages"; + public static final String USES_TYPE = "Uses"; @Setter private EntityId from; diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 0e51cf256a..1ccc5fa95f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -71,11 +71,13 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -196,11 +198,18 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC } } } - + RuleChainId ruleChainId = ruleChain.getId(); List updatedRuleNodes = new ArrayList<>(); List existingRuleNodes = getRuleChainNodes(tenantId, ruleChainMetaData.getRuleChainId()); for (RuleNode existingNode : existingRuleNodes) { relationService.deleteEntityRelations(tenantId, existingNode.getId()); + if (existingNode.getType().equals("org.thingsboard.rule.engine.flow.TbRuleChainInputNode")) { + if (existingNode.getConfiguration().has("ruleChainId")) { + RuleChainId targetRuleChainId = extractRuleChainIdFromInputNode(existingNode); + var relation = createRuleChainInputRelation(ruleChainId, targetRuleChainId); + relationService.deleteRelation(tenantId, relation); + } + } Integer index = ruleNodeIndexMap.get(existingNode.getId()); RuleNode newRuleNode = null; if (index != null) { @@ -212,7 +221,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC } updatedRuleNodes.add(new RuleNodeUpdateResult(existingNode, newRuleNode)); } - RuleChainId ruleChainId = ruleChain.getId(); + if (nodes != null) { long now = System.currentTimeMillis(); for (RuleNode node : toAddOrUpdate) { @@ -225,6 +234,13 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC RuleNode savedNode = ruleNodeDao.save(tenantId, node); relations.add(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.RULE_CHAIN)); + if (node.getType().equals("org.thingsboard.rule.engine.flow.TbRuleChainInputNode")) { + if (node.getConfiguration().has("ruleChainId")) { + RuleChainId targetRuleChainId = extractRuleChainIdFromInputNode(node); + var relation = createRuleChainInputRelation(ruleChainId, targetRuleChainId); + relations.add(relation); + } + } int index = nodes.indexOf(node); nodes.set(index, savedNode); ruleNodeIndexMap.put(savedNode.getId(), index); @@ -295,6 +311,21 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC return RuleChainUpdateResult.successful(updatedRuleNodes); } + private EntityRelation createRuleChainInputRelation(RuleChainId ruleChainId, RuleChainId targetRuleChainId) { + EntityRelation relation = new EntityRelation(); + relation.setFrom(ruleChainId); + relation.setTo(targetRuleChainId); + relation.setType(EntityRelation.USES_TYPE); + relation.setTypeGroup(RelationTypeGroup.COMMON); + return relation; + } + + private RuleChainId extractRuleChainIdFromInputNode(RuleNode node) { + JsonNode configuration = node.getConfiguration(); + UUID targetUuid = UUID.fromString(configuration.get("ruleChainId").asText()); + return new RuleChainId(targetUuid); + } + @Override public RuleChainMetaData loadRuleChainMetaData(TenantId tenantId, RuleChainId ruleChainId) { Validator.validateId(ruleChainId, "Incorrect rule chain id."); From f06fc5b27597500919545e09ea2b7b6d1a26b2d5 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 28 Jan 2025 14:38:35 +0100 Subject: [PATCH 2/6] imports optimization --- .../org/thingsboard/server/dao/rule/BaseRuleChainService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 1ccc5fa95f..eb78cf316e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -71,7 +71,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; From bbdeda95580c5903c1cff4981ed90671168c96c6 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Sun, 9 Mar 2025 19:48:38 +0100 Subject: [PATCH 3/6] added tests --- .../dao/service/RuleChainServiceTest.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java index 86b76bd5a5..8b5ebd4306 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.service; import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; @@ -28,12 +29,14 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; import java.io.IOException; @@ -44,6 +47,7 @@ import java.util.UUID; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.thingsboard.server.common.data.relation.EntityRelation.USES_TYPE; /** * Created by igor on 3/13/18. @@ -55,6 +59,8 @@ public class RuleChainServiceTest extends AbstractServiceTest { EdgeService edgeService; @Autowired RuleChainService ruleChainService; + @Autowired + RelationService relationService; private IdComparator idComparator = new IdComparator<>(); private IdComparator ruleNodeIdComparator = new IdComparator<>(); @@ -354,6 +360,66 @@ public class RuleChainServiceTest extends AbstractServiceTest { Assert.assertTrue(ruleChainById.isRoot()); } + @Test + public void testSaveRuleChainWithInputNode() { + RuleChain toRuleChain = new RuleChain(); + toRuleChain.setName("To Rule Chain"); + toRuleChain.setTenantId(tenantId); + RuleChain savedToRuleChain = ruleChainService.saveRuleChain(toRuleChain); + + RuleChain fromRuleChain = new RuleChain(); + fromRuleChain.setName("From RuleChain"); + fromRuleChain.setTenantId(tenantId); + RuleChain savedFromRuleChain = ruleChainService.saveRuleChain(fromRuleChain); + + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(savedFromRuleChain.getId()); + + RuleNode ruleNode = new RuleNode(); + ruleNode.setName("Input node"); + ruleNode.setType("org.thingsboard.rule.engine.flow.TbRuleChainInputNode"); + ObjectNode configuration = JacksonUtil.newObjectNode(); + configuration.put("ruleChainId", savedToRuleChain.getId().toString()); + ruleNode.setConfiguration(configuration); + + List ruleNodes = new ArrayList<>(); + ruleNodes.add(ruleNode); + ruleChainMetaData.setFirstNodeIndex(0); + ruleChainMetaData.setNodes(ruleNodes); + + ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData, Function.identity()); + + List relations = relationService.findByFromAndType(tenantId, savedFromRuleChain.getId(), USES_TYPE, RelationTypeGroup.COMMON); + Assert.assertEquals(1, relations.size()); + EntityRelation usesRelation = relations.get(0); + Assert.assertEquals(savedFromRuleChain.getId(), usesRelation.getFrom()); + Assert.assertEquals(savedToRuleChain.getId(), usesRelation.getTo()); + + RuleChain newToRuleChain = new RuleChain(); + newToRuleChain.setName("New To Rule Chain"); + newToRuleChain.setTenantId(tenantId); + RuleChain savedNewToRuleChain = ruleChainService.saveRuleChain(newToRuleChain); + + RuleNode newRuleNode = new RuleNode(); + newRuleNode.setName("Input node"); + newRuleNode.setType("org.thingsboard.rule.engine.flow.TbRuleChainInputNode"); + ObjectNode newConfiguration = JacksonUtil.newObjectNode(); + configuration.put("ruleChainId", savedNewToRuleChain.getId().toString()); + newRuleNode.setConfiguration(newConfiguration); + + List newRuleNodes = new ArrayList<>(); + newRuleNodes.add(newRuleNode); + RuleChainMetaData foundRuleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChainMetaData.getRuleChainId()); + foundRuleChainMetaData.setNodes(newRuleNodes); + ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData, Function.identity()); + + List newRelations = relationService.findByFromAndType(tenantId, savedFromRuleChain.getId(), USES_TYPE, RelationTypeGroup.COMMON); + Assert.assertEquals(1, relations.size()); + EntityRelation newUsesRelation = newRelations.get(0); + Assert.assertEquals(savedFromRuleChain.getId(), newUsesRelation.getFrom()); + Assert.assertEquals(savedNewToRuleChain.getId(), newUsesRelation.getTo()); + } + private RuleChainId saveRuleChainAndSetAutoAssignToEdge(String name) { RuleChain edgeRuleChain = new RuleChain(); edgeRuleChain.setTenantId(tenantId); From 37482b51f33f7e4003ffddc534c98aa10c1f31e5 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 11 Mar 2025 12:33:05 +0100 Subject: [PATCH 4/6] added upgrade script --- .../update/DefaultDataUpdateService.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index e8622fae37..27e0f6adc2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -24,23 +24,36 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.alarm.AlarmSeverity; +import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.query.DynamicValue; import org.thingsboard.server.common.data.query.FilterPredicateValue; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.sql.JpaExecutorService; +import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.component.RuleNodeClassInfo; +import org.thingsboard.server.service.install.DbUpgradeExecutorService; import org.thingsboard.server.service.install.InstallScripts; import org.thingsboard.server.utils.TbNodeUpgradeUtils; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import static org.thingsboard.server.common.data.relation.EntityRelation.USES_TYPE; + @Service @Profile("install") @Slf4j @@ -52,6 +65,12 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private RuleChainService ruleChainService; + @Autowired + private RelationService relationService; + + @Autowired + private TenantService tenantService; + @Autowired private ComponentDiscoveryService componentDiscoveryService; @@ -61,13 +80,66 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private InstallScripts installScripts; + @Autowired + private DbUpgradeExecutorService executorService; + @Override public void updateData() throws Exception { log.info("Updating data ..."); //TODO: should be cleaned after each release + inputNodesUpdater.updateEntities(); log.info("Data updated."); } + //TODO: should be removed after release + private final PaginatedUpdater inputNodesUpdater = new PaginatedUpdater<>() { + @Override + protected String getName() { + return "Input nodes updater"; + } + + @Override + protected PageData findEntities(String type, PageLink pageLink) { + return tenantService.findTenants(pageLink); + } + + @Override + protected void updateEntity(Tenant tenant) { + TenantId tenantId = tenant.getId(); + try { + var inputNodes = ruleChainService.findRuleNodesByTenantIdAndType(tenantId, "org.thingsboard.rule.engine.flow.TbRuleChainInputNode"); + var resultFutures = inputNodes.stream().map(ruleNode -> { + try { + JsonNode id = ruleNode.getConfiguration().get("ruleChainId"); + if (id != null) { + RuleChainId toRuleChainId = new RuleChainId(UUID.fromString(id.asText())); + RuleChainId fromRuleChainId = ruleNode.getRuleChainId(); + var isExistFuture = relationService.checkRelationAsync(null, fromRuleChainId, toRuleChainId, USES_TYPE, RelationTypeGroup.COMMON); + Futures.transformAsync(isExistFuture, isExist -> { + if (!isExist) { + EntityRelation relation = new EntityRelation(); + relation.setFrom(fromRuleChainId); + relation.setTo(toRuleChainId); + relation.setType(EntityRelation.USES_TYPE); + relation.setTypeGroup(RelationTypeGroup.COMMON); + return relationService.saveRelationAsync(tenantId, relation); + } + return Futures.immediateFuture(null); + }, executorService); + } + } catch (Exception e) { + log.error("[{}] Create relation for input node: [{}]", tenantId, ruleNode, e); + } + return Futures.immediateFuture(null); + }).toList(); + + Futures.allAsList(resultFutures).get(); + } catch (Exception e) { + log.error("[{}] Unable to update Tenant input nodes", tenantId, e); + } + } + }; + @Override public void upgradeRuleNodes() { int totalRuleNodesUpgraded = 0; From 753071ea17fbb4eb74af7d88839f525f459683b8 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 14 Mar 2025 14:48:32 +0100 Subject: [PATCH 5/6] minor refactoring --- .../update/DefaultDataUpdateService.java | 24 +++++++------------ .../server/dao/rule/BaseRuleChainService.java | 6 ++--- .../dao/service/RuleChainServiceTest.java | 5 ++-- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index b1f80cb793..4c90258330 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -49,7 +49,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; -import static org.thingsboard.server.common.data.relation.EntityRelation.USES_TYPE; +import static org.thingsboard.server.dao.rule.BaseRuleChainService.TB_RULE_CHAIN_INPUT_NODE; @Service @Profile("install") @@ -98,28 +98,22 @@ public class DefaultDataUpdateService implements DataUpdateService { protected void updateEntity(Tenant tenant) { TenantId tenantId = tenant.getId(); try { - var inputNodes = ruleChainService.findRuleNodesByTenantIdAndType(tenantId, "org.thingsboard.rule.engine.flow.TbRuleChainInputNode"); + var inputNodes = ruleChainService.findRuleNodesByTenantIdAndType(tenantId, TB_RULE_CHAIN_INPUT_NODE); var resultFutures = inputNodes.stream().map(ruleNode -> { try { JsonNode id = ruleNode.getConfiguration().get("ruleChainId"); if (id != null) { RuleChainId toRuleChainId = new RuleChainId(UUID.fromString(id.asText())); RuleChainId fromRuleChainId = ruleNode.getRuleChainId(); - var isExistFuture = relationService.checkRelationAsync(null, fromRuleChainId, toRuleChainId, USES_TYPE, RelationTypeGroup.COMMON); - Futures.transformAsync(isExistFuture, isExist -> { - if (!isExist) { - EntityRelation relation = new EntityRelation(); - relation.setFrom(fromRuleChainId); - relation.setTo(toRuleChainId); - relation.setType(EntityRelation.USES_TYPE); - relation.setTypeGroup(RelationTypeGroup.COMMON); - return relationService.saveRelationAsync(tenantId, relation); - } - return Futures.immediateFuture(null); - }, executorService); + EntityRelation relation = new EntityRelation(); + relation.setFrom(fromRuleChainId); + relation.setTo(toRuleChainId); + relation.setType(EntityRelation.USES_TYPE); + relation.setTypeGroup(RelationTypeGroup.COMMON); + return relationService.saveRelationAsync(tenantId, relation); } } catch (Exception e) { - log.error("[{}] Create relation for input node: [{}]", tenantId, ruleNode, e); + log.error("[{}] Failed to save relation for input node: [{}]", tenantId, ruleNode, e); } return Futures.immediateFuture(null); }).toList(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 9d61408199..1ce9bd555b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -210,7 +210,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC List existingRuleNodes = getRuleChainNodes(tenantId, ruleChainMetaData.getRuleChainId()); for (RuleNode existingNode : existingRuleNodes) { relationService.deleteEntityRelations(tenantId, existingNode.getId()); - if (existingNode.getType().equals("org.thingsboard.rule.engine.flow.TbRuleChainInputNode")) { + if (existingNode.getType().equals(TB_RULE_CHAIN_INPUT_NODE)) { if (existingNode.getConfiguration().has("ruleChainId")) { RuleChainId targetRuleChainId = extractRuleChainIdFromInputNode(existingNode); var relation = createRuleChainInputRelation(ruleChainId, targetRuleChainId); @@ -241,7 +241,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC RuleNode savedNode = ruleNodeDao.save(tenantId, node); relations.add(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.RULE_CHAIN)); - if (node.getType().equals("org.thingsboard.rule.engine.flow.TbRuleChainInputNode")) { + if (node.getType().equals(TB_RULE_CHAIN_INPUT_NODE)) { if (node.getConfiguration().has("ruleChainId")) { RuleChainId targetRuleChainId = extractRuleChainIdFromInputNode(node); var relation = createRuleChainInputRelation(ruleChainId, targetRuleChainId); @@ -280,7 +280,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC RuleNode targetNode = new RuleNode(); targetNode.setName(targetRuleChain != null ? targetRuleChain.getName() : "Rule Chain Input"); targetNode.setRuleChainId(ruleChainId); - targetNode.setType("org.thingsboard.rule.engine.flow.TbRuleChainInputNode"); + targetNode.setType(TB_RULE_CHAIN_INPUT_NODE); var configuration = JacksonUtil.newObjectNode(); configuration.put("ruleChainId", targetRuleChainId.getId().toString()); targetNode.setConfiguration(configuration); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java index 54144d5192..a2ca663619 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java @@ -48,6 +48,7 @@ import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.thingsboard.server.common.data.relation.EntityRelation.USES_TYPE; +import static org.thingsboard.server.dao.rule.BaseRuleChainService.TB_RULE_CHAIN_INPUT_NODE; /** * Created by igor on 3/13/18. @@ -377,7 +378,7 @@ public class RuleChainServiceTest extends AbstractServiceTest { RuleNode ruleNode = new RuleNode(); ruleNode.setName("Input node"); - ruleNode.setType("org.thingsboard.rule.engine.flow.TbRuleChainInputNode"); + ruleNode.setType(TB_RULE_CHAIN_INPUT_NODE); ObjectNode configuration = JacksonUtil.newObjectNode(); configuration.put("ruleChainId", savedToRuleChain.getId().toString()); ruleNode.setConfiguration(configuration); @@ -402,7 +403,7 @@ public class RuleChainServiceTest extends AbstractServiceTest { RuleNode newRuleNode = new RuleNode(); newRuleNode.setName("Input node"); - newRuleNode.setType("org.thingsboard.rule.engine.flow.TbRuleChainInputNode"); + newRuleNode.setType(TB_RULE_CHAIN_INPUT_NODE); ObjectNode newConfiguration = JacksonUtil.newObjectNode(); configuration.put("ruleChainId", savedNewToRuleChain.getId().toString()); newRuleNode.setConfiguration(newConfiguration); From 381e976d87c252722e488dfd19e0416721823794 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 7 May 2025 12:17:24 +0300 Subject: [PATCH 6/6] Refactoring for input node relations --- .../update/DefaultDataUpdateService.java | 69 ++++----- .../server/dao/rule/BaseRuleChainService.java | 39 +++-- .../dao/service/RuleChainServiceTest.java | 134 ++++++++++-------- 3 files changed, 117 insertions(+), 125 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 4c90258330..d8ddd7cd9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -24,21 +24,18 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageDataIterable; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.query.DynamicValue; import org.thingsboard.server.common.data.query.FilterPredicateValue; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; -import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.component.RuleNodeClassInfo; import org.thingsboard.server.service.install.DbUpgradeExecutorService; @@ -46,6 +43,7 @@ import org.thingsboard.server.utils.TbNodeUpgradeUtils; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -65,9 +63,6 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private RelationService relationService; - @Autowired - private TenantService tenantService; - @Autowired private ComponentDiscoveryService componentDiscoveryService; @@ -78,52 +73,36 @@ public class DefaultDataUpdateService implements DataUpdateService { public void updateData() throws Exception { log.info("Updating data ..."); //TODO: should be cleaned after each release - inputNodesUpdater.updateEntities(); + updateInputNodes(); log.info("Data updated."); } - //TODO: should be removed after release - private final PaginatedUpdater inputNodesUpdater = new PaginatedUpdater<>() { - @Override - protected String getName() { - return "Input nodes updater"; - } - - @Override - protected PageData findEntities(String type, PageLink pageLink) { - return tenantService.findTenants(pageLink); - } - - @Override - protected void updateEntity(Tenant tenant) { - TenantId tenantId = tenant.getId(); + private void updateInputNodes() { + log.info("Creating relations for input nodes..."); + int n = 0; + var inputNodes = new PageDataIterable<>(pageLink -> ruleChainService.findAllRuleNodesByType(TB_RULE_CHAIN_INPUT_NODE, pageLink), 1024); + for (RuleNode inputNode : inputNodes) { try { - var inputNodes = ruleChainService.findRuleNodesByTenantIdAndType(tenantId, TB_RULE_CHAIN_INPUT_NODE); - var resultFutures = inputNodes.stream().map(ruleNode -> { - try { - JsonNode id = ruleNode.getConfiguration().get("ruleChainId"); - if (id != null) { - RuleChainId toRuleChainId = new RuleChainId(UUID.fromString(id.asText())); - RuleChainId fromRuleChainId = ruleNode.getRuleChainId(); - EntityRelation relation = new EntityRelation(); - relation.setFrom(fromRuleChainId); - relation.setTo(toRuleChainId); - relation.setType(EntityRelation.USES_TYPE); - relation.setTypeGroup(RelationTypeGroup.COMMON); - return relationService.saveRelationAsync(tenantId, relation); - } - } catch (Exception e) { - log.error("[{}] Failed to save relation for input node: [{}]", tenantId, ruleNode, e); - } - return Futures.immediateFuture(null); - }).toList(); + RuleChainId targetRuleChainId = Optional.ofNullable(inputNode.getConfiguration().get("ruleChainId")) + .filter(JsonNode::isTextual).map(JsonNode::asText).map(id -> new RuleChainId(UUID.fromString(id))) + .orElse(null); + if (targetRuleChainId == null) { + continue; + } - Futures.allAsList(resultFutures).get(); + EntityRelation relation = new EntityRelation(); + relation.setFrom(inputNode.getRuleChainId()); + relation.setTo(targetRuleChainId); + relation.setType(EntityRelation.USES_TYPE); + relation.setTypeGroup(RelationTypeGroup.COMMON); + relationService.saveRelation(TenantId.SYS_TENANT_ID, relation); + n++; } catch (Exception e) { - log.error("[{}] Unable to update Tenant input nodes", tenantId, e); + log.error("Failed to save relation for input node: {}", inputNode, e); } } - }; + log.info("Created {} relations for input nodes", n); + } @Override public void upgradeRuleNodes() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 1ce9bd555b..8538bd9492 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -211,9 +211,8 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC for (RuleNode existingNode : existingRuleNodes) { relationService.deleteEntityRelations(tenantId, existingNode.getId()); if (existingNode.getType().equals(TB_RULE_CHAIN_INPUT_NODE)) { - if (existingNode.getConfiguration().has("ruleChainId")) { - RuleChainId targetRuleChainId = extractRuleChainIdFromInputNode(existingNode); - var relation = createRuleChainInputRelation(ruleChainId, targetRuleChainId); + EntityRelation relation = getRuleChainInputRelation(ruleChainId, existingNode); + if (relation != null) { relationService.deleteRelation(tenantId, relation); } } @@ -242,9 +241,8 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC relations.add(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.RULE_CHAIN)); if (node.getType().equals(TB_RULE_CHAIN_INPUT_NODE)) { - if (node.getConfiguration().has("ruleChainId")) { - RuleChainId targetRuleChainId = extractRuleChainIdFromInputNode(node); - var relation = createRuleChainInputRelation(ruleChainId, targetRuleChainId); + EntityRelation relation = getRuleChainInputRelation(ruleChainId, node); + if (relation != null) { relations.add(relation); } } @@ -262,7 +260,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC firstRuleNodeId = nodes.get(ruleChainMetaData.getFirstNodeIndex()).getId(); } if ((ruleChain.getFirstRuleNodeId() != null && !ruleChain.getFirstRuleNodeId().equals(firstRuleNodeId)) - || (ruleChain.getFirstRuleNodeId() == null && firstRuleNodeId != null)) { + || (ruleChain.getFirstRuleNodeId() == null && firstRuleNodeId != null)) { ruleChain.setFirstRuleNodeId(firstRuleNodeId); } if (ruleChainMetaData.getConnections() != null) { @@ -317,19 +315,20 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC return RuleChainUpdateResult.successful(updatedRuleNodes); } - private EntityRelation createRuleChainInputRelation(RuleChainId ruleChainId, RuleChainId targetRuleChainId) { - EntityRelation relation = new EntityRelation(); - relation.setFrom(ruleChainId); - relation.setTo(targetRuleChainId); - relation.setType(EntityRelation.USES_TYPE); - relation.setTypeGroup(RelationTypeGroup.COMMON); - return relation; - } - - private RuleChainId extractRuleChainIdFromInputNode(RuleNode node) { - JsonNode configuration = node.getConfiguration(); - UUID targetUuid = UUID.fromString(configuration.get("ruleChainId").asText()); - return new RuleChainId(targetUuid); + private EntityRelation getRuleChainInputRelation(RuleChainId ruleChainId, RuleNode inputNode) { + RuleChainId targetRuleChainId = Optional.ofNullable(inputNode.getConfiguration().get("ruleChainId")) + .filter(JsonNode::isTextual).map(JsonNode::asText).map(id -> new RuleChainId(UUID.fromString(id))) + .orElse(null); + if (targetRuleChainId != null) { + EntityRelation relation = new EntityRelation(); + relation.setFrom(ruleChainId); + relation.setTo(targetRuleChainId); + relation.setType(EntityRelation.USES_TYPE); + relation.setTypeGroup(RelationTypeGroup.COMMON); + return relation; + } else { + return null; + } } @Override diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java index a2ca663619..11fa38dc19 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/RuleChainServiceTest.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.service; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; @@ -46,6 +45,7 @@ import java.util.List; import java.util.UUID; import java.util.function.Function; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.thingsboard.server.common.data.relation.EntityRelation.USES_TYPE; import static org.thingsboard.server.dao.rule.BaseRuleChainService.TB_RULE_CHAIN_INPUT_NODE; @@ -277,7 +277,7 @@ public class RuleChainServiceTest extends AbstractServiceTest { List ruleNodes = savedRuleChainMetaData.getNodes(); int name3Index = -1; - for (int i=0;i ruleNodes = new ArrayList<>(); - ruleNodes.add(ruleNode); + ruleNodes.add(toRuleChain1Node); + ruleNodes.add(toRuleChain1Node2); ruleChainMetaData.setFirstNodeIndex(0); ruleChainMetaData.setNodes(ruleNodes); ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData, Function.identity()); - List relations = relationService.findByFromAndType(tenantId, savedFromRuleChain.getId(), USES_TYPE, RelationTypeGroup.COMMON); - Assert.assertEquals(1, relations.size()); - EntityRelation usesRelation = relations.get(0); - Assert.assertEquals(savedFromRuleChain.getId(), usesRelation.getFrom()); - Assert.assertEquals(savedToRuleChain.getId(), usesRelation.getTo()); + List relations = relationService.findByFromAndType(tenantId, fromRuleChainId, USES_TYPE, RelationTypeGroup.COMMON); + assertThat(relations).singleElement().satisfies(relationToRuleChain1 -> { + assertThat(relationToRuleChain1.getFrom()).isEqualTo(fromRuleChainId); + assertThat(relationToRuleChain1.getTo()).isEqualTo(toRuleChain1Id); + }); - RuleChain newToRuleChain = new RuleChain(); - newToRuleChain.setName("New To Rule Chain"); - newToRuleChain.setTenantId(tenantId); - RuleChain savedNewToRuleChain = ruleChainService.saveRuleChain(newToRuleChain); + RuleChain toRuleChain2 = new RuleChain(); + toRuleChain2.setName("To Rule Chain 2"); + toRuleChain2.setTenantId(tenantId); + toRuleChain2 = ruleChainService.saveRuleChain(toRuleChain2); + RuleChainId toRuleChain2Id = toRuleChain2.getId(); - RuleNode newRuleNode = new RuleNode(); - newRuleNode.setName("Input node"); - newRuleNode.setType(TB_RULE_CHAIN_INPUT_NODE); - ObjectNode newConfiguration = JacksonUtil.newObjectNode(); - configuration.put("ruleChainId", savedNewToRuleChain.getId().toString()); - newRuleNode.setConfiguration(newConfiguration); + RuleNode toRuleChain2Node = new RuleNode(); + toRuleChain2Node.setName("To Rule Chain 2"); + toRuleChain2Node.setType(TB_RULE_CHAIN_INPUT_NODE); + toRuleChain2Node.setConfiguration(JacksonUtil.newObjectNode() + .put("ruleChainId", toRuleChain2Id.toString())); List newRuleNodes = new ArrayList<>(); - newRuleNodes.add(newRuleNode); - RuleChainMetaData foundRuleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChainMetaData.getRuleChainId()); - foundRuleChainMetaData.setNodes(newRuleNodes); + newRuleNodes.add(toRuleChain2Node); + newRuleNodes.add(toRuleChain1Node); + ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChainMetaData.getRuleChainId()); + ruleChainMetaData.setNodes(newRuleNodes); ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData, Function.identity()); - List newRelations = relationService.findByFromAndType(tenantId, savedFromRuleChain.getId(), USES_TYPE, RelationTypeGroup.COMMON); - Assert.assertEquals(1, relations.size()); - EntityRelation newUsesRelation = newRelations.get(0); - Assert.assertEquals(savedFromRuleChain.getId(), newUsesRelation.getFrom()); - Assert.assertEquals(savedNewToRuleChain.getId(), newUsesRelation.getTo()); + List newRelations = relationService.findByFromAndType(tenantId, fromRuleChainId, USES_TYPE, RelationTypeGroup.COMMON); + assertThat(newRelations).hasSize(2); + assertThat(newRelations).anySatisfy(relationToRuleChain1 -> { + assertThat(relationToRuleChain1.getFrom()).isEqualTo(fromRuleChainId); + assertThat(relationToRuleChain1.getTo()).isEqualTo(toRuleChain1Id); + }); + assertThat(newRelations).anySatisfy(relationToRuleChain2 -> { + assertThat(relationToRuleChain2.getFrom()).isEqualTo(fromRuleChainId); + assertThat(relationToRuleChain2.getTo()).isEqualTo(toRuleChain2Id); + }); } private RuleChainId saveRuleChainAndSetAutoAssignToEdge(String name) { @@ -462,9 +476,9 @@ public class RuleChainServiceTest extends AbstractServiceTest { ruleChainMetaData.setFirstNodeIndex(0); ruleChainMetaData.setNodes(ruleNodes); - ruleChainMetaData.addConnectionInfo(0,1,"success"); - ruleChainMetaData.addConnectionInfo(0,2,"fail"); - ruleChainMetaData.addConnectionInfo(1,2,"success"); + ruleChainMetaData.addConnectionInfo(0, 1, "success"); + ruleChainMetaData.addConnectionInfo(0, 2, "fail"); + ruleChainMetaData.addConnectionInfo(1, 2, "success"); Assert.assertTrue(ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData, Function.identity()).isSuccess()); return ruleChainService.loadRuleChainMetaData(tenantId, ruleChainMetaData.getRuleChainId()); @@ -501,10 +515,10 @@ public class RuleChainServiceTest extends AbstractServiceTest { ruleChainMetaData.setFirstNodeIndex(0); ruleChainMetaData.setNodes(ruleNodes); - ruleChainMetaData.addConnectionInfo(0,1,"success"); - ruleChainMetaData.addConnectionInfo(0,2,"fail"); - ruleChainMetaData.addConnectionInfo(1,2,"success"); - ruleChainMetaData.addConnectionInfo(2,2,"success"); + ruleChainMetaData.addConnectionInfo(0, 1, "success"); + ruleChainMetaData.addConnectionInfo(0, 2, "fail"); + ruleChainMetaData.addConnectionInfo(1, 2, "success"); + ruleChainMetaData.addConnectionInfo(2, 2, "success"); return ruleChainMetaData; } @@ -540,10 +554,10 @@ public class RuleChainServiceTest extends AbstractServiceTest { ruleChainMetaData.setFirstNodeIndex(0); ruleChainMetaData.setNodes(ruleNodes); - ruleChainMetaData.addConnectionInfo(0,1,"success"); - ruleChainMetaData.addConnectionInfo(0,2,"fail"); - ruleChainMetaData.addConnectionInfo(1,2,"success"); - ruleChainMetaData.addConnectionInfo(2,0,"success"); + ruleChainMetaData.addConnectionInfo(0, 1, "success"); + ruleChainMetaData.addConnectionInfo(0, 2, "fail"); + ruleChainMetaData.addConnectionInfo(1, 2, "success"); + ruleChainMetaData.addConnectionInfo(2, 0, "success"); return ruleChainMetaData; } @@ -649,16 +663,16 @@ public class RuleChainServiceTest extends AbstractServiceTest { private RuleChain getRuleChain() { String ruleChainStr = "{\n" + - " \"name\": \"Root Rule Chain\",\n" + - " \"type\": \"CORE\",\n" + - " \"firstRuleNodeId\": {\n" + - " \"entityType\": \"RULE_NODE\",\n" + - " \"id\": \"91ad0b00-e779-11ee-9cf0-15d8b6079fdb\"\n" + - " },\n" + - " \"debugMode\": false,\n" + - " \"configuration\": null,\n" + - " \"additionalInfo\": null\n" + - "}"; + " \"name\": \"Root Rule Chain\",\n" + + " \"type\": \"CORE\",\n" + + " \"firstRuleNodeId\": {\n" + + " \"entityType\": \"RULE_NODE\",\n" + + " \"id\": \"91ad0b00-e779-11ee-9cf0-15d8b6079fdb\"\n" + + " },\n" + + " \"debugMode\": false,\n" + + " \"configuration\": null,\n" + + " \"additionalInfo\": null\n" + + "}"; return JacksonUtil.fromString(ruleChainStr, RuleChain.class); } }