diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 8c3b78040b..d16b8c362b 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -17,7 +17,7 @@ package org.thingsboard.server.service.install.update; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -84,6 +84,7 @@ import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.service.component.ComponentDiscoveryService; +import org.thingsboard.server.service.component.RuleNodeClassInfo; import org.thingsboard.server.service.install.InstallScripts; import org.thingsboard.server.service.install.SystemDataLoaderService; @@ -102,7 +103,8 @@ import static org.thingsboard.server.common.data.StringUtils.isBlank; @Slf4j public class DefaultDataUpdateService implements DataUpdateService { - private static final int MAX_PENDING_SAVE_RULE_NODE_FUTURES = 100; + private static final int MAX_PENDING_SAVE_RULE_NODE_FUTURES = 256; + private static final int DEFAULT_PAGE_SIZE = 1024; @Autowired private TenantService tenantService; @@ -231,67 +233,77 @@ public class DefaultDataUpdateService implements DataUpdateService { @Override public void upgradeRuleNodes() { try { - var futures = new ArrayList>(100); int totalRuleNodesUpgraded = 0; log.info("Starting rule nodes upgrade ..."); var nodeClassToVersionMap = componentDiscoveryService.getVersionedNodes(); log.debug("Found {} versioned nodes to check for upgrade!", nodeClassToVersionMap.size()); for (var ruleNodeClassInfo : nodeClassToVersionMap) { - var ruleNodeType = ruleNodeClassInfo.getClassName(); var ruleNodeTypeForLogs = ruleNodeClassInfo.getSimpleName(); var toVersion = ruleNodeClassInfo.getCurrentVersion(); log.debug("Going to check for nodes with type: {} to upgrade to version: {}.", ruleNodeTypeForLogs, toVersion); - var ruleNodesToUpdate = new PageDataIterable<>( - pageLink -> ruleChainService.findAllRuleNodesByTypeAndVersionLessThan(ruleNodeType, toVersion, pageLink), 1024 - ); - if (Iterables.isEmpty(ruleNodesToUpdate)) { - log.debug("There are no active nodes with type: {}, or all nodes with this type already set to latest version!", ruleNodeTypeForLogs); - } else { - for (var ruleNode : ruleNodesToUpdate) { - var ruleNodeId = ruleNode.getId(); - var oldConfiguration = ruleNode.getConfiguration(); - int fromVersion = ruleNode.getConfigurationVersion(); - log.debug("Going to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}", - ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion); - try { - var tbVersionedNode = (TbNode) ruleNodeClassInfo.getClazz().getDeclaredConstructor().newInstance(); - TbPair upgradeRuleNodeConfigurationResult = tbVersionedNode.upgrade(fromVersion, oldConfiguration); - if (upgradeRuleNodeConfigurationResult.getFirst()) { - ruleNode.setConfiguration(upgradeRuleNodeConfigurationResult.getSecond()); - } - ruleNode.setConfigurationVersion(toVersion); - futures.add(jpaExecutorService.submit(() -> { - ruleChainService.saveRuleNode(TenantId.SYS_TENANT_ID, ruleNode); - log.debug("Successfully upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}", - ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion); - })); - if (futures.size() >= MAX_PENDING_SAVE_RULE_NODE_FUTURES) { - log.info("{} upgraded rule nodes so far ...", - totalRuleNodesUpgraded += awaitFuturesToCompleteAndGetCount(futures)); - futures.clear(); - } - } catch (Exception e) { - log.warn("Failed to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {} due to: ", - ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion, e); - } - } + var ruleNodesIdsToUpgrade = getRuleNodesIdsWithTypeAndVersionLessThan(ruleNodeClassInfo.getClassName(), toVersion); + if (ruleNodesIdsToUpgrade.isEmpty()) { + log.debug("There are no active nodes with type {}, or all nodes with this type already set to latest version!", ruleNodeTypeForLogs); + continue; + } + var ruleNodeIdsPartitions = Lists.partition(ruleNodesIdsToUpgrade, MAX_PENDING_SAVE_RULE_NODE_FUTURES); + for (var ruleNodePack : ruleNodeIdsPartitions) { + totalRuleNodesUpgraded += processRuleNodePack(ruleNodePack, ruleNodeClassInfo); + log.info("{} upgraded rule nodes so far ...", totalRuleNodesUpgraded); } } - log.info("Finished rule nodes upgrade. Upgraded rule nodes count: {}", - totalRuleNodesUpgraded + awaitFuturesToCompleteAndGetCount(futures)); + log.info("Finished rule nodes upgrade. Upgraded rule nodes count: {}", totalRuleNodesUpgraded); } catch (Exception e) { log.error("Unexpected error during rule nodes upgrade: ", e); } } - private int awaitFuturesToCompleteAndGetCount(List> futures) { + private int processRuleNodePack(List ruleNodeIdsBatch, RuleNodeClassInfo ruleNodeClassInfo) { + var saveFutures = new ArrayList>(MAX_PENDING_SAVE_RULE_NODE_FUTURES); + String ruleNodeType = ruleNodeClassInfo.getSimpleName(); + int toVersion = ruleNodeClassInfo.getCurrentVersion(); + var ruleNodesPack = ruleChainService.findAllRuleNodesByIds(ruleNodeIdsBatch); + for (var ruleNode : ruleNodesPack) { + if (ruleNode == null) { + continue; + } + var ruleNodeId = ruleNode.getId(); + var oldConfiguration = ruleNode.getConfiguration(); + int fromVersion = ruleNode.getConfigurationVersion(); + log.debug("Going to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}", + ruleNodeId, ruleNodeType, fromVersion, toVersion); + try { + var tbVersionedNode = (TbNode) ruleNodeClassInfo.getClazz().getDeclaredConstructor().newInstance(); + TbPair upgradeRuleNodeConfigurationResult = tbVersionedNode.upgrade(fromVersion, oldConfiguration); + if (upgradeRuleNodeConfigurationResult.getFirst()) { + ruleNode.setConfiguration(upgradeRuleNodeConfigurationResult.getSecond()); + } + ruleNode.setConfigurationVersion(toVersion); + saveFutures.add(jpaExecutorService.submit(() -> { + ruleChainService.saveRuleNode(TenantId.SYS_TENANT_ID, ruleNode); + log.debug("Successfully upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}", + ruleNodeId, ruleNodeType, fromVersion, toVersion); + })); + } catch (Exception e) { + log.warn("Failed to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {} due to: ", + ruleNodeId, ruleNodeType, fromVersion, toVersion, e); + } + } try { - return Futures.allAsList(futures).get().size(); + return Futures.allAsList(saveFutures).get().size(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException("Failed to process save rule nodes requests due to: ", e); } } + private List getRuleNodesIdsWithTypeAndVersionLessThan(String type, int toVersion) { + var ruleNodeIds = new ArrayList(); + new PageDataIterable<>(pageLink -> + ruleChainService.findAllRuleNodeIdsByTypeAndVersionLessThan(type, toVersion, pageLink), DEFAULT_PAGE_SIZE + ).forEach(ruleNodeIds::add); + return ruleNodeIds; + } + private final PaginatedUpdater deviceProfileEntityDynamicConditionsUpdater = new PaginatedUpdater<>() { @@ -373,12 +385,11 @@ public class DefaultDataUpdateService implements DataUpdateService { private void updateNestedRuleChains() { try { - var packSize = 1024; var updated = 0; boolean hasNext = true; while (hasNext) { - List relations = relationService.findRuleNodeToRuleChainRelations(TenantId.SYS_TENANT_ID, RuleChainType.CORE, packSize); - hasNext = relations.size() == packSize; + List relations = relationService.findRuleNodeToRuleChainRelations(TenantId.SYS_TENANT_ID, RuleChainType.CORE, DEFAULT_PAGE_SIZE); + hasNext = relations.size() == DEFAULT_PAGE_SIZE; for (EntityRelation relation : relations) { try { RuleNodeId sourceNodeId = new RuleNodeId(relation.getFrom().getId()); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java index 95cdc0e02d..532da4ac00 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java @@ -102,6 +102,10 @@ public interface RuleChainService extends EntityDaoService { PageData findAllRuleNodesByTypeAndVersionLessThan(String type, int version, PageLink pageLink); + PageData findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink); + + List findAllRuleNodesByIds(List ruleNodeIds); + RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode); void deleteRuleNodes(TenantId tenantId, RuleChainId ruleChainId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 5e6b1e8009..956e4b83f6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -78,6 +78,7 @@ import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.TENANT; import static org.thingsboard.server.dao.service.Validator.validateId; +import static org.thingsboard.server.dao.service.Validator.validateIds; import static org.thingsboard.server.dao.service.Validator.validatePageLink; import static org.thingsboard.server.dao.service.Validator.validatePositiveNumber; import static org.thingsboard.server.dao.service.Validator.validateString; @@ -726,6 +727,23 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC return ruleNodeDao.findAllRuleNodesByTypeAndVersionLessThan(type, version, pageLink); } + @Override + public PageData findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink) { + log.trace("Executing findAllRuleNodeIdsByTypeAndVersionLessThan, type {}, pageLink {}, version {}", type, pageLink, version); + validateString(type, "Incorrect type of the rule node"); + validatePositiveNumber(version, "Incorrect version to compare with. Version should be greater than 0!"); + validatePageLink(pageLink); + return ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan(type, version, pageLink); + } + + @Override + public List findAllRuleNodesByIds(List ruleNodeIds) { + log.trace("Executing findAllRuleNodesByIds, ruleNodeIds {}", ruleNodeIds); + validateIds(ruleNodeIds, "Incorrect ruleNodeIds " + ruleNodeIds); + assert ruleNodeIds.size() <= 1024; + return ruleNodeDao.findAllRuleNodeByIds(ruleNodeIds); + } + @Override public RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode) { return ruleNodeDao.save(tenantId, ruleNode); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java index cd8fbb0a3c..b6f2dd097e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java @@ -36,7 +36,12 @@ public interface RuleNodeDao extends Dao { PageData findAllRuleNodesByTypeAndVersionLessThan(String type, int version, PageLink pageLink); + PageData findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink); + + List findAllRuleNodeByIds(List ruleNodeIds); + List findByExternalIds(RuleChainId ruleChainId, List externalIds); void deleteByIdIn(List ruleNodeIds); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java index 827b73a0c1..de5ff109d3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java @@ -79,6 +79,22 @@ public class JpaRuleNodeDao extends JpaAbstractDao imp DaoUtil.toPageable(pageLink))); } + @Override + public PageData findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink) { + return DaoUtil.pageToPageData(ruleNodeRepository + .findAllRuleNodeIdsByTypeAndVersionLessThan( + type, + version, + DaoUtil.toPageable(pageLink))) + .mapData(RuleNodeId::new); + } + + @Override + public List findAllRuleNodeByIds(List ruleNodeIds) { + return DaoUtil.convertDataList(ruleNodeRepository.findAllById( + ruleNodeIds.stream().map(RuleNodeId::getId).collect(Collectors.toList()))); + } + @Override public List findByExternalIds(RuleChainId ruleChainId, List externalIds) { return DaoUtil.convertDataList(ruleNodeRepository.findRuleNodesByRuleChainIdAndExternalIdIn(ruleChainId.getId(), diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java index c8ecb8000b..f02072b356 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java @@ -50,6 +50,11 @@ public interface RuleNodeRepository extends JpaRepository @Param("searchText") String searchText, Pageable pageable); + @Query("SELECT r.id FROM RuleNodeEntity r WHERE r.type = :ruleType AND r.configurationVersion < :version") + Page findAllRuleNodeIdsByTypeAndVersionLessThan(@Param("ruleType") String ruleType, + @Param("version") int version, + Pageable pageable); + List findRuleNodesByRuleChainIdAndExternalIdIn(UUID ruleChainId, List externalIds); @Transactional diff --git a/dao/src/test/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDaoTest.java index 6e77927233..2f1c73da01 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDaoTest.java @@ -22,6 +22,7 @@ import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; @@ -36,10 +37,10 @@ import org.thingsboard.server.dao.rule.RuleNodeDao; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; public class JpaRuleNodeDaoTest extends AbstractJpaDaoTest { @@ -124,7 +125,39 @@ public class JpaRuleNodeDaoTest extends AbstractJpaDaoTest { assertEquals(10, ruleNodes.getData().size()); } + @Test + public void testFindRuleNodeIdsByTypeAndVersionLessThan() { + PageData ruleNodeIds = ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan( "A", 1, new PageLink(10, 0, PREFIX_FOR_RULE_NODE_NAME)); + assertEquals(20, ruleNodeIds.getTotalElements()); + assertEquals(2, ruleNodeIds.getTotalPages()); + assertEquals(10, ruleNodeIds.getData().size()); + + ruleNodeIds = ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan( "A", 1, new PageLink(10, 0)); + assertEquals(20, ruleNodeIds.getTotalElements()); + assertEquals(2, ruleNodeIds.getTotalPages()); + assertEquals(10, ruleNodeIds.getData().size()); + + // test - search text ignored + ruleNodeIds = ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan( "A", 1, new PageLink(10, 0, StringUtils.randomAlphabetic(5))); + assertEquals(20, ruleNodeIds.getTotalElements()); + assertEquals(2, ruleNodeIds.getTotalPages()); + assertEquals(10, ruleNodeIds.getData().size()); + } + + @Test + public void testFindAllRuleNodeByIds() { + var fromUUIDs = ruleNodeIds.stream().map(RuleNodeId::new).collect(Collectors.toList()); + var ruleNodes = ruleNodeDao.findAllRuleNodeByIds(fromUUIDs); + assertEquals(40, ruleNodes.size()); + } + private List createRuleNodes(TenantId tenantId1, TenantId tenantId2, RuleChainId ruleChainId1, RuleChainId ruleChainId2, int count) { + return createRuleNodes(tenantId1, tenantId2, ruleChainId1, ruleChainId2, "A", "B", count); + } + + private List createRuleNodes(TenantId tenantId1, TenantId tenantId2, + RuleChainId ruleChainId1, RuleChainId ruleChainId2, + String typeA, String typeB, int count) { var chain1 = new RuleChain(ruleChainId1); chain1.setTenantId(tenantId1); chain1.setName(ruleChainId1.toString()); @@ -135,8 +168,8 @@ public class JpaRuleNodeDaoTest extends AbstractJpaDaoTest { ruleChainDao.save(tenantId2, chain2); List savedRuleNodeIds = new ArrayList<>(); for (int i = 0; i < count / 2; i++) { - savedRuleNodeIds.add(ruleNodeDao.save(tenantId1, getRuleNode(ruleChainId1, "A", Integer.toString(i))).getUuidId()); - savedRuleNodeIds.add(ruleNodeDao.save(tenantId2, getRuleNode(ruleChainId2, "B", Integer.toString(i + count / 2))).getUuidId()); + savedRuleNodeIds.add(ruleNodeDao.save(tenantId1, getRuleNode(ruleChainId1, typeA, Integer.toString(i))).getUuidId()); + savedRuleNodeIds.add(ruleNodeDao.save(tenantId2, getRuleNode(ruleChainId2, typeB, Integer.toString(i + count / 2))).getUuidId()); } return savedRuleNodeIds; }