Merge pull request #13661 from thingsboard/upgrade-cleanup
Upgrade cleanup for 4.2
This commit is contained in:
		
						commit
						43e84e4e4a
					
				@ -116,7 +116,6 @@ public class ThingsboardInstallService {
 | 
			
		||||
                    entityDatabaseSchemaService.createDatabaseIndexes();
 | 
			
		||||
 | 
			
		||||
                    // TODO: cleanup update code after each release
 | 
			
		||||
                    systemDataLoaderService.updateDefaultNotificationConfigs(false);
 | 
			
		||||
 | 
			
		||||
                    // Runs upgrade scripts that are not possible in plain SQL.
 | 
			
		||||
                    dataUpdateService.updateData();
 | 
			
		||||
 | 
			
		||||
@ -32,7 +32,7 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
 | 
			
		||||
 | 
			
		||||
    // This list should include all versions which are compatible for the upgrade.
 | 
			
		||||
    // The compatibility cycle usually breaks when we have some scripts written in Java that may not work after new release.
 | 
			
		||||
    private static final List<String> SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("4.0.0", "4.0.1");
 | 
			
		||||
    private static final List<String> SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("4.1.0");
 | 
			
		||||
 | 
			
		||||
    private final ProjectInfo projectInfo;
 | 
			
		||||
    private final JdbcTemplate jdbcTemplate;
 | 
			
		||||
 | 
			
		||||
@ -20,25 +20,17 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import com.google.common.collect.Lists;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.context.annotation.Profile;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageDataIterable;
 | 
			
		||||
import org.thingsboard.server.common.data.query.DynamicValue;
 | 
			
		||||
import org.thingsboard.server.common.data.query.FilterPredicateValue;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
			
		||||
import org.thingsboard.server.dao.relation.RelationService;
 | 
			
		||||
import org.thingsboard.server.dao.rule.RuleChainService;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
 | 
			
		||||
import org.thingsboard.server.service.component.ComponentDiscoveryService;
 | 
			
		||||
import org.thingsboard.server.service.component.RuleNodeClassInfo;
 | 
			
		||||
import org.thingsboard.server.service.install.DbUpgradeExecutorService;
 | 
			
		||||
@ -46,110 +38,29 @@ import org.thingsboard.server.utils.TbNodeUpgradeUtils;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.rule.BaseRuleChainService.TB_RULE_CHAIN_INPUT_NODE;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@Profile("install")
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class DefaultDataUpdateService implements DataUpdateService {
 | 
			
		||||
 | 
			
		||||
    private static final int MAX_PENDING_SAVE_RULE_NODE_FUTURES = 256;
 | 
			
		||||
    private static final int DEFAULT_PAGE_SIZE = 1024;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private RuleChainService ruleChainService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private RelationService relationService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private ComponentDiscoveryService componentDiscoveryService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private DbUpgradeExecutorService executorService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private TenantProfileService tenantProfileService;
 | 
			
		||||
    private final RuleChainService ruleChainService;
 | 
			
		||||
    private final ComponentDiscoveryService componentDiscoveryService;
 | 
			
		||||
    private final DbUpgradeExecutorService executorService;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void updateData() throws Exception {
 | 
			
		||||
        log.info("Updating data ...");
 | 
			
		||||
        //TODO: should be cleaned after each release
 | 
			
		||||
        updateInputNodes();
 | 
			
		||||
        deduplicateRateLimitsPerSecondsConfigurations();
 | 
			
		||||
 | 
			
		||||
        log.info("Data updated.");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void deduplicateRateLimitsPerSecondsConfigurations() {
 | 
			
		||||
        log.info("Starting update of tenant profiles...");
 | 
			
		||||
 | 
			
		||||
        int totalProfiles = 0;
 | 
			
		||||
        int updatedTenantProfiles = 0;
 | 
			
		||||
        int skippedProfiles = 0;
 | 
			
		||||
        int failedProfiles = 0;
 | 
			
		||||
 | 
			
		||||
        var tenantProfiles = new PageDataIterable<>(
 | 
			
		||||
                pageLink -> tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, pageLink), 1024);
 | 
			
		||||
 | 
			
		||||
        for (TenantProfile tenantProfile : tenantProfiles) {
 | 
			
		||||
            totalProfiles++;
 | 
			
		||||
            String profileName = tenantProfile.getName();
 | 
			
		||||
            UUID profileId = tenantProfile.getId().getId();
 | 
			
		||||
            try {
 | 
			
		||||
                Optional<DefaultTenantProfileConfiguration> profileConfiguration = tenantProfile.getProfileConfiguration();
 | 
			
		||||
                if (profileConfiguration.isEmpty()) {
 | 
			
		||||
                    log.debug("[{}][{}] Skipping tenant profile with non-default configuration.", profileId, profileName);
 | 
			
		||||
                    skippedProfiles++;
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                DefaultTenantProfileConfiguration defaultTenantProfileConfiguration = profileConfiguration.get();
 | 
			
		||||
                defaultTenantProfileConfiguration.deduplicateRateLimitsConfigs();
 | 
			
		||||
                tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile);
 | 
			
		||||
                updatedTenantProfiles++;
 | 
			
		||||
                log.debug("[{}][{}] Successfully updated tenant profile.", profileId, profileName);
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.error("[{}][{}] Failed to updated tenant profile: ", profileId, profileName, e);
 | 
			
		||||
                failedProfiles++;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        log.info("Tenant profiles update completed. Total: {}, Updated: {}, Skipped: {}, Failed: {}",
 | 
			
		||||
                totalProfiles, updatedTenantProfiles, skippedProfiles, failedProfiles);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private void updateInputNodes() {
 | 
			
		||||
        log.info("Creating relations for input nodes...");
 | 
			
		||||
        int n = 0;
 | 
			
		||||
        var inputNodes = new PageDataIterable<>(pageLink -> ruleChainService.findAllRuleNodesByType(TB_RULE_CHAIN_INPUT_NODE, pageLink), 1024);
 | 
			
		||||
        for (RuleNode inputNode : inputNodes) {
 | 
			
		||||
            try {
 | 
			
		||||
                RuleChainId targetRuleChainId = Optional.ofNullable(inputNode.getConfiguration().get("ruleChainId"))
 | 
			
		||||
                        .filter(JsonNode::isTextual).map(JsonNode::asText).map(id -> new RuleChainId(UUID.fromString(id)))
 | 
			
		||||
                        .orElse(null);
 | 
			
		||||
                if (targetRuleChainId == null) {
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                EntityRelation relation = new EntityRelation();
 | 
			
		||||
                relation.setFrom(inputNode.getRuleChainId());
 | 
			
		||||
                relation.setTo(targetRuleChainId);
 | 
			
		||||
                relation.setType(EntityRelation.USES_TYPE);
 | 
			
		||||
                relation.setTypeGroup(RelationTypeGroup.COMMON);
 | 
			
		||||
                relationService.saveRelation(TenantId.SYS_TENANT_ID, relation);
 | 
			
		||||
                n++;
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.error("Failed to save relation for input node: {}", inputNode, e);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        log.info("Created {} relations for input nodes", n);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void upgradeRuleNodes() {
 | 
			
		||||
        int totalRuleNodesUpgraded = 0;
 | 
			
		||||
 | 
			
		||||
@ -77,16 +77,4 @@ public class RateLimitUtil {
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Deprecated(forRemoval = true, since = "4.1")
 | 
			
		||||
    public static String deduplicateByDuration(String configStr) {
 | 
			
		||||
        if (configStr == null) {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
        Set<Long> distinctDurations = new HashSet<>();
 | 
			
		||||
        return parseConfig(configStr).stream()
 | 
			
		||||
                .filter(entry -> distinctDurations.add(entry.durationSeconds()))
 | 
			
		||||
                .map(RateLimitEntry::toString)
 | 
			
		||||
                .collect(Collectors.joining(","));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -24,7 +24,6 @@ import lombok.NoArgsConstructor;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfileType;
 | 
			
		||||
import org.thingsboard.server.common.data.limit.RateLimitUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.validation.RateLimit;
 | 
			
		||||
 | 
			
		||||
import java.io.Serial;
 | 
			
		||||
@ -236,43 +235,4 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
 | 
			
		||||
        return maxRuleNodeExecutionsPerMessage;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Deprecated(forRemoval = true, since = "4.1")
 | 
			
		||||
    public void deduplicateRateLimitsConfigs() {
 | 
			
		||||
        this.transportTenantMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportTenantMsgRateLimit);
 | 
			
		||||
        this.transportTenantTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportTenantTelemetryMsgRateLimit);
 | 
			
		||||
        this.transportTenantTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportTenantTelemetryDataPointsRateLimit);
 | 
			
		||||
 | 
			
		||||
        this.transportDeviceMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportDeviceMsgRateLimit);
 | 
			
		||||
        this.transportDeviceTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportDeviceTelemetryMsgRateLimit);
 | 
			
		||||
        this.transportDeviceTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportDeviceTelemetryDataPointsRateLimit);
 | 
			
		||||
 | 
			
		||||
        this.transportGatewayMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayMsgRateLimit);
 | 
			
		||||
        this.transportGatewayTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayTelemetryMsgRateLimit);
 | 
			
		||||
        this.transportGatewayTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayTelemetryDataPointsRateLimit);
 | 
			
		||||
 | 
			
		||||
        this.transportGatewayDeviceMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayDeviceMsgRateLimit);
 | 
			
		||||
        this.transportGatewayDeviceTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayDeviceTelemetryMsgRateLimit);
 | 
			
		||||
        this.transportGatewayDeviceTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayDeviceTelemetryDataPointsRateLimit);
 | 
			
		||||
 | 
			
		||||
        this.tenantEntityExportRateLimit = RateLimitUtil.deduplicateByDuration(tenantEntityExportRateLimit);
 | 
			
		||||
        this.tenantEntityImportRateLimit = RateLimitUtil.deduplicateByDuration(tenantEntityImportRateLimit);
 | 
			
		||||
        this.tenantNotificationRequestsRateLimit = RateLimitUtil.deduplicateByDuration(tenantNotificationRequestsRateLimit);
 | 
			
		||||
        this.tenantNotificationRequestsPerRuleRateLimit = RateLimitUtil.deduplicateByDuration(tenantNotificationRequestsPerRuleRateLimit);
 | 
			
		||||
 | 
			
		||||
        this.cassandraReadQueryTenantCoreRateLimits = RateLimitUtil.deduplicateByDuration(cassandraReadQueryTenantCoreRateLimits);
 | 
			
		||||
        this.cassandraWriteQueryTenantCoreRateLimits = RateLimitUtil.deduplicateByDuration(cassandraWriteQueryTenantCoreRateLimits);
 | 
			
		||||
        this.cassandraReadQueryTenantRuleEngineRateLimits = RateLimitUtil.deduplicateByDuration(cassandraReadQueryTenantRuleEngineRateLimits);
 | 
			
		||||
        this.cassandraWriteQueryTenantRuleEngineRateLimits = RateLimitUtil.deduplicateByDuration(cassandraWriteQueryTenantRuleEngineRateLimits);
 | 
			
		||||
 | 
			
		||||
        this.edgeEventRateLimits = RateLimitUtil.deduplicateByDuration(edgeEventRateLimits);
 | 
			
		||||
        this.edgeEventRateLimitsPerEdge = RateLimitUtil.deduplicateByDuration(edgeEventRateLimitsPerEdge);
 | 
			
		||||
        this.edgeUplinkMessagesRateLimits = RateLimitUtil.deduplicateByDuration(edgeUplinkMessagesRateLimits);
 | 
			
		||||
        this.edgeUplinkMessagesRateLimitsPerEdge = RateLimitUtil.deduplicateByDuration(edgeUplinkMessagesRateLimitsPerEdge);
 | 
			
		||||
 | 
			
		||||
        this.wsUpdatesPerSessionRateLimit = RateLimitUtil.deduplicateByDuration(wsUpdatesPerSessionRateLimit);
 | 
			
		||||
 | 
			
		||||
        this.tenantServerRestLimitsConfiguration = RateLimitUtil.deduplicateByDuration(tenantServerRestLimitsConfiguration);
 | 
			
		||||
        this.customerServerRestLimitsConfiguration = RateLimitUtil.deduplicateByDuration(customerServerRestLimitsConfiguration);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user