Merge pull request #9441 from ShvaykaD/bugfix/rule-nodes-upgrade-script
fixed rule nodes upgrade script
This commit is contained in:
commit
362e99c3fc
@ -17,7 +17,7 @@ package org.thingsboard.server.service.install.update;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
@ -84,6 +84,7 @@ import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.service.component.ComponentDiscoveryService;
|
||||
import org.thingsboard.server.service.component.RuleNodeClassInfo;
|
||||
import org.thingsboard.server.service.install.InstallScripts;
|
||||
import org.thingsboard.server.service.install.SystemDataLoaderService;
|
||||
|
||||
@ -102,7 +103,8 @@ import static org.thingsboard.server.common.data.StringUtils.isBlank;
|
||||
@Slf4j
|
||||
public class DefaultDataUpdateService implements DataUpdateService {
|
||||
|
||||
private static final int MAX_PENDING_SAVE_RULE_NODE_FUTURES = 100;
|
||||
private static final int MAX_PENDING_SAVE_RULE_NODE_FUTURES = 256;
|
||||
private static final int DEFAULT_PAGE_SIZE = 1024;
|
||||
|
||||
@Autowired
|
||||
private TenantService tenantService;
|
||||
@ -231,67 +233,77 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
@Override
|
||||
public void upgradeRuleNodes() {
|
||||
try {
|
||||
var futures = new ArrayList<ListenableFuture<?>>(100);
|
||||
int totalRuleNodesUpgraded = 0;
|
||||
log.info("Starting rule nodes upgrade ...");
|
||||
var nodeClassToVersionMap = componentDiscoveryService.getVersionedNodes();
|
||||
log.debug("Found {} versioned nodes to check for upgrade!", nodeClassToVersionMap.size());
|
||||
for (var ruleNodeClassInfo : nodeClassToVersionMap) {
|
||||
var ruleNodeType = ruleNodeClassInfo.getClassName();
|
||||
var ruleNodeTypeForLogs = ruleNodeClassInfo.getSimpleName();
|
||||
var toVersion = ruleNodeClassInfo.getCurrentVersion();
|
||||
log.debug("Going to check for nodes with type: {} to upgrade to version: {}.", ruleNodeTypeForLogs, toVersion);
|
||||
var ruleNodesToUpdate = new PageDataIterable<>(
|
||||
pageLink -> ruleChainService.findAllRuleNodesByTypeAndVersionLessThan(ruleNodeType, toVersion, pageLink), 1024
|
||||
);
|
||||
if (Iterables.isEmpty(ruleNodesToUpdate)) {
|
||||
log.debug("There are no active nodes with type: {}, or all nodes with this type already set to latest version!", ruleNodeTypeForLogs);
|
||||
} else {
|
||||
for (var ruleNode : ruleNodesToUpdate) {
|
||||
var ruleNodeId = ruleNode.getId();
|
||||
var oldConfiguration = ruleNode.getConfiguration();
|
||||
int fromVersion = ruleNode.getConfigurationVersion();
|
||||
log.debug("Going to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}",
|
||||
ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion);
|
||||
try {
|
||||
var tbVersionedNode = (TbNode) ruleNodeClassInfo.getClazz().getDeclaredConstructor().newInstance();
|
||||
TbPair<Boolean, JsonNode> upgradeRuleNodeConfigurationResult = tbVersionedNode.upgrade(fromVersion, oldConfiguration);
|
||||
if (upgradeRuleNodeConfigurationResult.getFirst()) {
|
||||
ruleNode.setConfiguration(upgradeRuleNodeConfigurationResult.getSecond());
|
||||
}
|
||||
ruleNode.setConfigurationVersion(toVersion);
|
||||
futures.add(jpaExecutorService.submit(() -> {
|
||||
ruleChainService.saveRuleNode(TenantId.SYS_TENANT_ID, ruleNode);
|
||||
log.debug("Successfully upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}",
|
||||
ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion);
|
||||
}));
|
||||
if (futures.size() >= MAX_PENDING_SAVE_RULE_NODE_FUTURES) {
|
||||
log.info("{} upgraded rule nodes so far ...",
|
||||
totalRuleNodesUpgraded += awaitFuturesToCompleteAndGetCount(futures));
|
||||
futures.clear();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {} due to: ",
|
||||
ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion, e);
|
||||
}
|
||||
}
|
||||
var ruleNodesIdsToUpgrade = getRuleNodesIdsWithTypeAndVersionLessThan(ruleNodeClassInfo.getClassName(), toVersion);
|
||||
if (ruleNodesIdsToUpgrade.isEmpty()) {
|
||||
log.debug("There are no active nodes with type {}, or all nodes with this type already set to latest version!", ruleNodeTypeForLogs);
|
||||
continue;
|
||||
}
|
||||
var ruleNodeIdsPartitions = Lists.partition(ruleNodesIdsToUpgrade, MAX_PENDING_SAVE_RULE_NODE_FUTURES);
|
||||
for (var ruleNodePack : ruleNodeIdsPartitions) {
|
||||
totalRuleNodesUpgraded += processRuleNodePack(ruleNodePack, ruleNodeClassInfo);
|
||||
log.info("{} upgraded rule nodes so far ...", totalRuleNodesUpgraded);
|
||||
}
|
||||
}
|
||||
log.info("Finished rule nodes upgrade. Upgraded rule nodes count: {}",
|
||||
totalRuleNodesUpgraded + awaitFuturesToCompleteAndGetCount(futures));
|
||||
log.info("Finished rule nodes upgrade. Upgraded rule nodes count: {}", totalRuleNodesUpgraded);
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected error during rule nodes upgrade: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
private int awaitFuturesToCompleteAndGetCount(List<ListenableFuture<?>> futures) {
|
||||
private int processRuleNodePack(List<RuleNodeId> ruleNodeIdsBatch, RuleNodeClassInfo ruleNodeClassInfo) {
|
||||
var saveFutures = new ArrayList<ListenableFuture<?>>(MAX_PENDING_SAVE_RULE_NODE_FUTURES);
|
||||
String ruleNodeType = ruleNodeClassInfo.getSimpleName();
|
||||
int toVersion = ruleNodeClassInfo.getCurrentVersion();
|
||||
var ruleNodesPack = ruleChainService.findAllRuleNodesByIds(ruleNodeIdsBatch);
|
||||
for (var ruleNode : ruleNodesPack) {
|
||||
if (ruleNode == null) {
|
||||
continue;
|
||||
}
|
||||
var ruleNodeId = ruleNode.getId();
|
||||
var oldConfiguration = ruleNode.getConfiguration();
|
||||
int fromVersion = ruleNode.getConfigurationVersion();
|
||||
log.debug("Going to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}",
|
||||
ruleNodeId, ruleNodeType, fromVersion, toVersion);
|
||||
try {
|
||||
var tbVersionedNode = (TbNode) ruleNodeClassInfo.getClazz().getDeclaredConstructor().newInstance();
|
||||
TbPair<Boolean, JsonNode> upgradeRuleNodeConfigurationResult = tbVersionedNode.upgrade(fromVersion, oldConfiguration);
|
||||
if (upgradeRuleNodeConfigurationResult.getFirst()) {
|
||||
ruleNode.setConfiguration(upgradeRuleNodeConfigurationResult.getSecond());
|
||||
}
|
||||
ruleNode.setConfigurationVersion(toVersion);
|
||||
saveFutures.add(jpaExecutorService.submit(() -> {
|
||||
ruleChainService.saveRuleNode(TenantId.SYS_TENANT_ID, ruleNode);
|
||||
log.debug("Successfully upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}",
|
||||
ruleNodeId, ruleNodeType, fromVersion, toVersion);
|
||||
}));
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {} due to: ",
|
||||
ruleNodeId, ruleNodeType, fromVersion, toVersion, e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
return Futures.allAsList(futures).get().size();
|
||||
return Futures.allAsList(saveFutures).get().size();
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
throw new RuntimeException("Failed to process save rule nodes requests due to: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<RuleNodeId> getRuleNodesIdsWithTypeAndVersionLessThan(String type, int toVersion) {
|
||||
var ruleNodeIds = new ArrayList<RuleNodeId>();
|
||||
new PageDataIterable<>(pageLink ->
|
||||
ruleChainService.findAllRuleNodeIdsByTypeAndVersionLessThan(type, toVersion, pageLink), DEFAULT_PAGE_SIZE
|
||||
).forEach(ruleNodeIds::add);
|
||||
return ruleNodeIds;
|
||||
}
|
||||
|
||||
private final PaginatedUpdater<String, DeviceProfileEntity> deviceProfileEntityDynamicConditionsUpdater =
|
||||
new PaginatedUpdater<>() {
|
||||
|
||||
@ -373,12 +385,11 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
|
||||
private void updateNestedRuleChains() {
|
||||
try {
|
||||
var packSize = 1024;
|
||||
var updated = 0;
|
||||
boolean hasNext = true;
|
||||
while (hasNext) {
|
||||
List<EntityRelation> relations = relationService.findRuleNodeToRuleChainRelations(TenantId.SYS_TENANT_ID, RuleChainType.CORE, packSize);
|
||||
hasNext = relations.size() == packSize;
|
||||
List<EntityRelation> relations = relationService.findRuleNodeToRuleChainRelations(TenantId.SYS_TENANT_ID, RuleChainType.CORE, DEFAULT_PAGE_SIZE);
|
||||
hasNext = relations.size() == DEFAULT_PAGE_SIZE;
|
||||
for (EntityRelation relation : relations) {
|
||||
try {
|
||||
RuleNodeId sourceNodeId = new RuleNodeId(relation.getFrom().getId());
|
||||
|
||||
@ -102,6 +102,10 @@ public interface RuleChainService extends EntityDaoService {
|
||||
|
||||
PageData<RuleNode> findAllRuleNodesByTypeAndVersionLessThan(String type, int version, PageLink pageLink);
|
||||
|
||||
PageData<RuleNodeId> findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink);
|
||||
|
||||
List<RuleNode> findAllRuleNodesByIds(List<RuleNodeId> ruleNodeIds);
|
||||
|
||||
RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode);
|
||||
|
||||
void deleteRuleNodes(TenantId tenantId, RuleChainId ruleChainId);
|
||||
|
||||
@ -78,6 +78,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.TENANT;
|
||||
import static org.thingsboard.server.dao.service.Validator.validateId;
|
||||
import static org.thingsboard.server.dao.service.Validator.validateIds;
|
||||
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
|
||||
import static org.thingsboard.server.dao.service.Validator.validatePositiveNumber;
|
||||
import static org.thingsboard.server.dao.service.Validator.validateString;
|
||||
@ -726,6 +727,23 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
|
||||
return ruleNodeDao.findAllRuleNodesByTypeAndVersionLessThan(type, version, pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<RuleNodeId> findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink) {
|
||||
log.trace("Executing findAllRuleNodeIdsByTypeAndVersionLessThan, type {}, pageLink {}, version {}", type, pageLink, version);
|
||||
validateString(type, "Incorrect type of the rule node");
|
||||
validatePositiveNumber(version, "Incorrect version to compare with. Version should be greater than 0!");
|
||||
validatePageLink(pageLink);
|
||||
return ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan(type, version, pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RuleNode> findAllRuleNodesByIds(List<RuleNodeId> ruleNodeIds) {
|
||||
log.trace("Executing findAllRuleNodesByIds, ruleNodeIds {}", ruleNodeIds);
|
||||
validateIds(ruleNodeIds, "Incorrect ruleNodeIds " + ruleNodeIds);
|
||||
assert ruleNodeIds.size() <= 1024;
|
||||
return ruleNodeDao.findAllRuleNodeByIds(ruleNodeIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode) {
|
||||
return ruleNodeDao.save(tenantId, ruleNode);
|
||||
|
||||
@ -36,7 +36,12 @@ public interface RuleNodeDao extends Dao<RuleNode> {
|
||||
|
||||
PageData<RuleNode> findAllRuleNodesByTypeAndVersionLessThan(String type, int version, PageLink pageLink);
|
||||
|
||||
PageData<RuleNodeId> findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink);
|
||||
|
||||
List<RuleNode> findAllRuleNodeByIds(List<RuleNodeId> ruleNodeIds);
|
||||
|
||||
List<RuleNode> findByExternalIds(RuleChainId ruleChainId, List<RuleNodeId> externalIds);
|
||||
|
||||
void deleteByIdIn(List<RuleNodeId> ruleNodeIds);
|
||||
|
||||
}
|
||||
|
||||
@ -79,6 +79,22 @@ public class JpaRuleNodeDao extends JpaAbstractDao<RuleNodeEntity, RuleNode> imp
|
||||
DaoUtil.toPageable(pageLink)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<RuleNodeId> findAllRuleNodeIdsByTypeAndVersionLessThan(String type, int version, PageLink pageLink) {
|
||||
return DaoUtil.pageToPageData(ruleNodeRepository
|
||||
.findAllRuleNodeIdsByTypeAndVersionLessThan(
|
||||
type,
|
||||
version,
|
||||
DaoUtil.toPageable(pageLink)))
|
||||
.mapData(RuleNodeId::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RuleNode> findAllRuleNodeByIds(List<RuleNodeId> ruleNodeIds) {
|
||||
return DaoUtil.convertDataList(ruleNodeRepository.findAllById(
|
||||
ruleNodeIds.stream().map(RuleNodeId::getId).collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RuleNode> findByExternalIds(RuleChainId ruleChainId, List<RuleNodeId> externalIds) {
|
||||
return DaoUtil.convertDataList(ruleNodeRepository.findRuleNodesByRuleChainIdAndExternalIdIn(ruleChainId.getId(),
|
||||
|
||||
@ -50,6 +50,11 @@ public interface RuleNodeRepository extends JpaRepository<RuleNodeEntity, UUID>
|
||||
@Param("searchText") String searchText,
|
||||
Pageable pageable);
|
||||
|
||||
@Query("SELECT r.id FROM RuleNodeEntity r WHERE r.type = :ruleType AND r.configurationVersion < :version")
|
||||
Page<UUID> findAllRuleNodeIdsByTypeAndVersionLessThan(@Param("ruleType") String ruleType,
|
||||
@Param("version") int version,
|
||||
Pageable pageable);
|
||||
|
||||
List<RuleNodeEntity> findRuleNodesByRuleChainIdAndExternalIdIn(UUID ruleChainId, List<UUID> externalIds);
|
||||
|
||||
@Transactional
|
||||
|
||||
@ -22,6 +22,7 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -36,10 +37,10 @@ import org.thingsboard.server.dao.rule.RuleNodeDao;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class JpaRuleNodeDaoTest extends AbstractJpaDaoTest {
|
||||
|
||||
@ -124,7 +125,39 @@ public class JpaRuleNodeDaoTest extends AbstractJpaDaoTest {
|
||||
assertEquals(10, ruleNodes.getData().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindRuleNodeIdsByTypeAndVersionLessThan() {
|
||||
PageData<RuleNodeId> ruleNodeIds = ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan( "A", 1, new PageLink(10, 0, PREFIX_FOR_RULE_NODE_NAME));
|
||||
assertEquals(20, ruleNodeIds.getTotalElements());
|
||||
assertEquals(2, ruleNodeIds.getTotalPages());
|
||||
assertEquals(10, ruleNodeIds.getData().size());
|
||||
|
||||
ruleNodeIds = ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan( "A", 1, new PageLink(10, 0));
|
||||
assertEquals(20, ruleNodeIds.getTotalElements());
|
||||
assertEquals(2, ruleNodeIds.getTotalPages());
|
||||
assertEquals(10, ruleNodeIds.getData().size());
|
||||
|
||||
// test - search text ignored
|
||||
ruleNodeIds = ruleNodeDao.findAllRuleNodeIdsByTypeAndVersionLessThan( "A", 1, new PageLink(10, 0, StringUtils.randomAlphabetic(5)));
|
||||
assertEquals(20, ruleNodeIds.getTotalElements());
|
||||
assertEquals(2, ruleNodeIds.getTotalPages());
|
||||
assertEquals(10, ruleNodeIds.getData().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindAllRuleNodeByIds() {
|
||||
var fromUUIDs = ruleNodeIds.stream().map(RuleNodeId::new).collect(Collectors.toList());
|
||||
var ruleNodes = ruleNodeDao.findAllRuleNodeByIds(fromUUIDs);
|
||||
assertEquals(40, ruleNodes.size());
|
||||
}
|
||||
|
||||
private List<UUID> createRuleNodes(TenantId tenantId1, TenantId tenantId2, RuleChainId ruleChainId1, RuleChainId ruleChainId2, int count) {
|
||||
return createRuleNodes(tenantId1, tenantId2, ruleChainId1, ruleChainId2, "A", "B", count);
|
||||
}
|
||||
|
||||
private List<UUID> createRuleNodes(TenantId tenantId1, TenantId tenantId2,
|
||||
RuleChainId ruleChainId1, RuleChainId ruleChainId2,
|
||||
String typeA, String typeB, int count) {
|
||||
var chain1 = new RuleChain(ruleChainId1);
|
||||
chain1.setTenantId(tenantId1);
|
||||
chain1.setName(ruleChainId1.toString());
|
||||
@ -135,8 +168,8 @@ public class JpaRuleNodeDaoTest extends AbstractJpaDaoTest {
|
||||
ruleChainDao.save(tenantId2, chain2);
|
||||
List<UUID> savedRuleNodeIds = new ArrayList<>();
|
||||
for (int i = 0; i < count / 2; i++) {
|
||||
savedRuleNodeIds.add(ruleNodeDao.save(tenantId1, getRuleNode(ruleChainId1, "A", Integer.toString(i))).getUuidId());
|
||||
savedRuleNodeIds.add(ruleNodeDao.save(tenantId2, getRuleNode(ruleChainId2, "B", Integer.toString(i + count / 2))).getUuidId());
|
||||
savedRuleNodeIds.add(ruleNodeDao.save(tenantId1, getRuleNode(ruleChainId1, typeA, Integer.toString(i))).getUuidId());
|
||||
savedRuleNodeIds.add(ruleNodeDao.save(tenantId2, getRuleNode(ruleChainId2, typeB, Integer.toString(i + count / 2))).getUuidId());
|
||||
}
|
||||
return savedRuleNodeIds;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user