Improve upgrade scripts
This commit is contained in:
		
							parent
							
								
									309ac0b1ec
								
							
						
					
					
						commit
						0e9acbec7a
					
				@ -220,6 +220,7 @@ public class ThingsboardInstallService {
 | 
			
		||||
                        case "3.3.4":
 | 
			
		||||
                            log.info("Upgrading ThingsBoard from version 3.3.4 to 3.4.0 ...");
 | 
			
		||||
                            databaseEntitiesUpgradeService.upgradeDatabase("3.3.4");
 | 
			
		||||
                            dataUpdateService.updateData("3.3.4");
 | 
			
		||||
                            log.info("Updating system data...");
 | 
			
		||||
                            systemDataLoaderService.updateSystemWidgets();
 | 
			
		||||
                            break;
 | 
			
		||||
 | 
			
		||||
@ -611,67 +611,76 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createQueues() {
 | 
			
		||||
        Queue mainQueue = new Queue();
 | 
			
		||||
        mainQueue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
        mainQueue.setName("Main");
 | 
			
		||||
        mainQueue.setTopic("tb_rule_engine.main");
 | 
			
		||||
        mainQueue.setPollInterval(25);
 | 
			
		||||
        mainQueue.setPartitions(10);
 | 
			
		||||
        mainQueue.setConsumerPerPartition(true);
 | 
			
		||||
        mainQueue.setPackProcessingTimeout(2000);
 | 
			
		||||
        SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
        mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
 | 
			
		||||
        mainQueueSubmitStrategy.setBatchSize(1000);
 | 
			
		||||
        mainQueue.setSubmitStrategy(mainQueueSubmitStrategy);
 | 
			
		||||
        ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
        mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
 | 
			
		||||
        mainQueueProcessingStrategy.setRetries(3);
 | 
			
		||||
        mainQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
        mainQueueProcessingStrategy.setPauseBetweenRetries(3);
 | 
			
		||||
        mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
 | 
			
		||||
        mainQueue.setProcessingStrategy(mainQueueProcessingStrategy);
 | 
			
		||||
        queueService.saveQueue(mainQueue);
 | 
			
		||||
        Queue mainQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "Main");
 | 
			
		||||
        if (mainQueue == null) {
 | 
			
		||||
            mainQueue = new Queue();
 | 
			
		||||
            mainQueue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
            mainQueue.setName("Main");
 | 
			
		||||
            mainQueue.setTopic("tb_rule_engine.main");
 | 
			
		||||
            mainQueue.setPollInterval(25);
 | 
			
		||||
            mainQueue.setPartitions(10);
 | 
			
		||||
            mainQueue.setConsumerPerPartition(true);
 | 
			
		||||
            mainQueue.setPackProcessingTimeout(2000);
 | 
			
		||||
            SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
            mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
 | 
			
		||||
            mainQueueSubmitStrategy.setBatchSize(1000);
 | 
			
		||||
            mainQueue.setSubmitStrategy(mainQueueSubmitStrategy);
 | 
			
		||||
            ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
            mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
 | 
			
		||||
            mainQueueProcessingStrategy.setRetries(3);
 | 
			
		||||
            mainQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
            mainQueueProcessingStrategy.setPauseBetweenRetries(3);
 | 
			
		||||
            mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
 | 
			
		||||
            mainQueue.setProcessingStrategy(mainQueueProcessingStrategy);
 | 
			
		||||
            queueService.saveQueue(mainQueue);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Queue highPriorityQueue = new Queue();
 | 
			
		||||
        highPriorityQueue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
        highPriorityQueue.setName("HighPriority");
 | 
			
		||||
        highPriorityQueue.setTopic("tb_rule_engine.hp");
 | 
			
		||||
        highPriorityQueue.setPollInterval(25);
 | 
			
		||||
        highPriorityQueue.setPartitions(10);
 | 
			
		||||
        highPriorityQueue.setConsumerPerPartition(true);
 | 
			
		||||
        highPriorityQueue.setPackProcessingTimeout(2000);
 | 
			
		||||
        SubmitStrategy highPriorityQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
        highPriorityQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
 | 
			
		||||
        highPriorityQueueSubmitStrategy.setBatchSize(100);
 | 
			
		||||
        highPriorityQueue.setSubmitStrategy(highPriorityQueueSubmitStrategy);
 | 
			
		||||
        ProcessingStrategy highPriorityQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
        highPriorityQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT);
 | 
			
		||||
        highPriorityQueueProcessingStrategy.setRetries(0);
 | 
			
		||||
        highPriorityQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
        highPriorityQueueProcessingStrategy.setPauseBetweenRetries(5);
 | 
			
		||||
        highPriorityQueueProcessingStrategy.setMaxPauseBetweenRetries(5);
 | 
			
		||||
        highPriorityQueue.setProcessingStrategy(highPriorityQueueProcessingStrategy);
 | 
			
		||||
        queueService.saveQueue(highPriorityQueue);
 | 
			
		||||
        Queue highPriorityQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "HighPriority");
 | 
			
		||||
        if (highPriorityQueue == null) {
 | 
			
		||||
            highPriorityQueue = new Queue();
 | 
			
		||||
            highPriorityQueue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
            highPriorityQueue.setName("HighPriority");
 | 
			
		||||
            highPriorityQueue.setTopic("tb_rule_engine.hp");
 | 
			
		||||
            highPriorityQueue.setPollInterval(25);
 | 
			
		||||
            highPriorityQueue.setPartitions(10);
 | 
			
		||||
            highPriorityQueue.setConsumerPerPartition(true);
 | 
			
		||||
            highPriorityQueue.setPackProcessingTimeout(2000);
 | 
			
		||||
            SubmitStrategy highPriorityQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
            highPriorityQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
 | 
			
		||||
            highPriorityQueueSubmitStrategy.setBatchSize(100);
 | 
			
		||||
            highPriorityQueue.setSubmitStrategy(highPriorityQueueSubmitStrategy);
 | 
			
		||||
            ProcessingStrategy highPriorityQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
            highPriorityQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT);
 | 
			
		||||
            highPriorityQueueProcessingStrategy.setRetries(0);
 | 
			
		||||
            highPriorityQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
            highPriorityQueueProcessingStrategy.setPauseBetweenRetries(5);
 | 
			
		||||
            highPriorityQueueProcessingStrategy.setMaxPauseBetweenRetries(5);
 | 
			
		||||
            highPriorityQueue.setProcessingStrategy(highPriorityQueueProcessingStrategy);
 | 
			
		||||
            queueService.saveQueue(highPriorityQueue);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Queue sequentialByOriginatorQueue = new Queue();
 | 
			
		||||
        sequentialByOriginatorQueue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
        sequentialByOriginatorQueue.setName("SequentialByOriginator");
 | 
			
		||||
        sequentialByOriginatorQueue.setTopic("tb_rule_engine.sq");
 | 
			
		||||
        sequentialByOriginatorQueue.setPollInterval(25);
 | 
			
		||||
        sequentialByOriginatorQueue.setPartitions(10);
 | 
			
		||||
        sequentialByOriginatorQueue.setPackProcessingTimeout(2000);
 | 
			
		||||
        sequentialByOriginatorQueue.setConsumerPerPartition(true);
 | 
			
		||||
        SubmitStrategy sequentialByOriginatorQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
        sequentialByOriginatorQueueSubmitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR);
 | 
			
		||||
        sequentialByOriginatorQueueSubmitStrategy.setBatchSize(100);
 | 
			
		||||
        sequentialByOriginatorQueue.setSubmitStrategy(sequentialByOriginatorQueueSubmitStrategy);
 | 
			
		||||
        ProcessingStrategy sequentialByOriginatorQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
        sequentialByOriginatorQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT);
 | 
			
		||||
        sequentialByOriginatorQueueProcessingStrategy.setRetries(3);
 | 
			
		||||
        sequentialByOriginatorQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
        sequentialByOriginatorQueueProcessingStrategy.setPauseBetweenRetries(5);
 | 
			
		||||
        sequentialByOriginatorQueueProcessingStrategy.setMaxPauseBetweenRetries(5);
 | 
			
		||||
        sequentialByOriginatorQueue.setProcessingStrategy(sequentialByOriginatorQueueProcessingStrategy);
 | 
			
		||||
        queueService.saveQueue(sequentialByOriginatorQueue);
 | 
			
		||||
        Queue sequentialByOriginatorQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "SequentialByOriginator");
 | 
			
		||||
        if (sequentialByOriginatorQueue == null) {
 | 
			
		||||
            sequentialByOriginatorQueue = new Queue();
 | 
			
		||||
            sequentialByOriginatorQueue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
            sequentialByOriginatorQueue.setName("SequentialByOriginator");
 | 
			
		||||
            sequentialByOriginatorQueue.setTopic("tb_rule_engine.sq");
 | 
			
		||||
            sequentialByOriginatorQueue.setPollInterval(25);
 | 
			
		||||
            sequentialByOriginatorQueue.setPartitions(10);
 | 
			
		||||
            sequentialByOriginatorQueue.setPackProcessingTimeout(2000);
 | 
			
		||||
            sequentialByOriginatorQueue.setConsumerPerPartition(true);
 | 
			
		||||
            SubmitStrategy sequentialByOriginatorQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
            sequentialByOriginatorQueueSubmitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR);
 | 
			
		||||
            sequentialByOriginatorQueueSubmitStrategy.setBatchSize(100);
 | 
			
		||||
            sequentialByOriginatorQueue.setSubmitStrategy(sequentialByOriginatorQueueSubmitStrategy);
 | 
			
		||||
            ProcessingStrategy sequentialByOriginatorQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
            sequentialByOriginatorQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT);
 | 
			
		||||
            sequentialByOriginatorQueueProcessingStrategy.setRetries(3);
 | 
			
		||||
            sequentialByOriginatorQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
            sequentialByOriginatorQueueProcessingStrategy.setPauseBetweenRetries(5);
 | 
			
		||||
            sequentialByOriginatorQueueProcessingStrategy.setMaxPauseBetweenRetries(5);
 | 
			
		||||
            sequentialByOriginatorQueue.setProcessingStrategy(sequentialByOriginatorQueueProcessingStrategy);
 | 
			
		||||
            queueService.saveQueue(sequentialByOriginatorQueue);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -571,73 +571,10 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
 | 
			
		||||
                    schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", SCHEMA_UPDATE_SQL);
 | 
			
		||||
                    loadSql(schemaUpdateFile, conn);
 | 
			
		||||
 | 
			
		||||
                    log.info("Loading queues...");
 | 
			
		||||
                    try {
 | 
			
		||||
                        if (!CollectionUtils.isEmpty(queueConfig.getQueues())) {
 | 
			
		||||
                            queueConfig.getQueues().forEach(queueSettings -> {
 | 
			
		||||
                                queueService.saveQueue(queueConfigToQueue(queueSettings));
 | 
			
		||||
                            });
 | 
			
		||||
                        } else {
 | 
			
		||||
                            systemDataLoaderService.createQueues();
 | 
			
		||||
                        }
 | 
			
		||||
                    } catch (Exception e) {
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    log.info("Updating device profiles...");
 | 
			
		||||
                    schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql");
 | 
			
		||||
                    loadSql(schemaUpdateFile, conn);
 | 
			
		||||
 | 
			
		||||
                    log.info("Updating checkpoint rule nodes...");
 | 
			
		||||
                    PageLink pageLink = new PageLink(100);
 | 
			
		||||
                    PageData<Tenant> pageData;
 | 
			
		||||
                    do {
 | 
			
		||||
                        pageData = tenantService.findTenants(pageLink);
 | 
			
		||||
                        for (Tenant tenant : pageData.getData()) {
 | 
			
		||||
                            TenantId tenantId = tenant.getId();
 | 
			
		||||
                            Map<String, QueueId> queues =
 | 
			
		||||
                                    queueService.findQueuesByTenantId(tenantId).stream().collect(Collectors.toMap(Queue::getName, Queue::getId));
 | 
			
		||||
                            try {
 | 
			
		||||
                                List<RuleNode> checkpointNodes =
 | 
			
		||||
                                        ruleChainService.findRuleNodesByTenantIdAndType(tenantId, "org.thingsboard.rule.engine.flow.TbCheckpointNode");
 | 
			
		||||
                                checkpointNodes.forEach(node -> {
 | 
			
		||||
                                    ObjectNode configuration = (ObjectNode) node.getConfiguration();
 | 
			
		||||
                                    JsonNode queueNameNode = configuration.remove("queueName");
 | 
			
		||||
                                    if (queueNameNode != null) {
 | 
			
		||||
                                        String queueName = queueNameNode.asText();
 | 
			
		||||
                                        configuration.put("queueId", queues.get(queueName).toString());
 | 
			
		||||
                                        ruleChainService.saveRuleNode(tenantId, node);
 | 
			
		||||
                                    }
 | 
			
		||||
                                });
 | 
			
		||||
                            } catch (Exception e) {
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        pageLink = pageLink.nextPageLink();
 | 
			
		||||
                    } while (pageData.hasNext());
 | 
			
		||||
 | 
			
		||||
                    log.info("Updating tenant profiles...");
 | 
			
		||||
                    PageLink profilePageLink = new PageLink(100);
 | 
			
		||||
                    PageData<TenantProfile> profilePageData;
 | 
			
		||||
                    do {
 | 
			
		||||
                        profilePageData = tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, profilePageLink);
 | 
			
		||||
 | 
			
		||||
                        profilePageData.getData().forEach(profile -> {
 | 
			
		||||
                            try {
 | 
			
		||||
                                List<TenantProfileQueueConfiguration> queueConfiguration = profile.getProfileData().getQueueConfiguration();
 | 
			
		||||
                                if (profile.isIsolatedTbRuleEngine() && (queueConfiguration == null || queueConfiguration.isEmpty())) {
 | 
			
		||||
                                    TenantProfileQueueConfiguration mainQueueConfig = getMainQueueConfiguration();
 | 
			
		||||
                                    profile.getProfileData().setQueueConfiguration(Collections.singletonList((mainQueueConfig)));
 | 
			
		||||
                                    tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, profile);
 | 
			
		||||
                                    List<TenantId> isolatedTenants = tenantService.findTenantIdsByTenantProfileId(profile.getId());
 | 
			
		||||
                                    isolatedTenants.forEach(tenantId -> {
 | 
			
		||||
                                        queueService.saveQueue(new Queue(tenantId, mainQueueConfig));
 | 
			
		||||
                                    });
 | 
			
		||||
                                }
 | 
			
		||||
                            } catch (Exception e) {
 | 
			
		||||
                            }
 | 
			
		||||
 | 
			
		||||
                        });
 | 
			
		||||
                        profilePageLink = profilePageLink.nextPageLink();
 | 
			
		||||
                    } while (profilePageData.hasNext());
 | 
			
		||||
                    log.info("Updating schema settings...");
 | 
			
		||||
                    conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;");
 | 
			
		||||
                    log.info("Schema updated.");
 | 
			
		||||
@ -691,48 +628,4 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
 | 
			
		||||
        return isOldSchema;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) {
 | 
			
		||||
        Queue queue = new Queue();
 | 
			
		||||
        queue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
        queue.setName(queueSettings.getName());
 | 
			
		||||
        queue.setTopic(queueSettings.getTopic());
 | 
			
		||||
        queue.setPollInterval(queueSettings.getPollInterval());
 | 
			
		||||
        queue.setPartitions(queueSettings.getPartitions());
 | 
			
		||||
        queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
 | 
			
		||||
        SubmitStrategy submitStrategy = new SubmitStrategy();
 | 
			
		||||
        submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
 | 
			
		||||
        submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
 | 
			
		||||
        queue.setSubmitStrategy(submitStrategy);
 | 
			
		||||
        ProcessingStrategy processingStrategy = new ProcessingStrategy();
 | 
			
		||||
        processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
 | 
			
		||||
        processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
 | 
			
		||||
        processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
 | 
			
		||||
        processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
 | 
			
		||||
        processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
 | 
			
		||||
        queue.setProcessingStrategy(processingStrategy);
 | 
			
		||||
        queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
 | 
			
		||||
        return queue;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TenantProfileQueueConfiguration getMainQueueConfiguration() {
 | 
			
		||||
        TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
 | 
			
		||||
        mainQueueConfiguration.setName("Main");
 | 
			
		||||
        mainQueueConfiguration.setTopic("tb_rule_engine.main");
 | 
			
		||||
        mainQueueConfiguration.setPollInterval(25);
 | 
			
		||||
        mainQueueConfiguration.setPartitions(10);
 | 
			
		||||
        mainQueueConfiguration.setConsumerPerPartition(true);
 | 
			
		||||
        mainQueueConfiguration.setPackProcessingTimeout(2000);
 | 
			
		||||
        SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
        mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
 | 
			
		||||
        mainQueueSubmitStrategy.setBatchSize(1000);
 | 
			
		||||
        mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy);
 | 
			
		||||
        ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
        mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
 | 
			
		||||
        mainQueueProcessingStrategy.setRetries(3);
 | 
			
		||||
        mainQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
        mainQueueProcessingStrategy.setPauseBetweenRetries(3);
 | 
			
		||||
        mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
 | 
			
		||||
        mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy);
 | 
			
		||||
        return mainQueueConfiguration;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.collections.CollectionUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.context.annotation.Profile;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
@ -31,14 +32,12 @@ import org.thingsboard.rule.engine.profile.TbDeviceProfileNode;
 | 
			
		||||
import org.thingsboard.rule.engine.profile.TbDeviceProfileNodeConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityView;
 | 
			
		||||
import org.thingsboard.server.common.data.Tenant;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityViewId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.*;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
@ -47,12 +46,14 @@ import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TimePageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.query.DynamicValue;
 | 
			
		||||
import org.thingsboard.server.common.data.query.FilterPredicateValue;
 | 
			
		||||
import org.thingsboard.server.common.data.queue.*;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChain;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainType;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
 | 
			
		||||
import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmDao;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmService;
 | 
			
		||||
@ -61,16 +62,22 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.DeviceProfileEntity;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.RelationEntity;
 | 
			
		||||
import org.thingsboard.server.dao.oauth2.OAuth2Service;
 | 
			
		||||
import org.thingsboard.server.dao.queue.QueueService;
 | 
			
		||||
import org.thingsboard.server.dao.relation.RelationService;
 | 
			
		||||
import org.thingsboard.server.dao.rule.RuleChainService;
 | 
			
		||||
import org.thingsboard.server.dao.sql.device.DeviceProfileRepository;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TenantService;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
import org.thingsboard.server.service.install.InstallScripts;
 | 
			
		||||
import org.thingsboard.server.service.install.SystemDataLoaderService;
 | 
			
		||||
import org.thingsboard.server.service.install.TbRuleEngineQueueConfigService;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicLong;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
@ -115,6 +122,18 @@ public class DefaultDataUpdateService implements DataUpdateService {
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private OAuth2Service oAuth2Service;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private TenantProfileService tenantProfileService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private QueueService queueService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private TbRuleEngineQueueConfigService queueConfig;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private SystemDataLoaderService systemDataLoaderService;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void updateData(String fromVersion) throws Exception {
 | 
			
		||||
        switch (fromVersion) {
 | 
			
		||||
@ -141,6 +160,26 @@ public class DefaultDataUpdateService implements DataUpdateService {
 | 
			
		||||
                log.info("Updating data from version 3.3.2 to 3.3.3 ...");
 | 
			
		||||
                updateNestedRuleChains();
 | 
			
		||||
                break;
 | 
			
		||||
            case "3.3.4":
 | 
			
		||||
                log.info("Updating data from version 3.3.4 to 3.4.0 ...");
 | 
			
		||||
                log.info("Loading queues...");
 | 
			
		||||
                try {
 | 
			
		||||
                    if (!CollectionUtils.isEmpty(queueConfig.getQueues())) {
 | 
			
		||||
                        queueConfig.getQueues().forEach(queueSettings -> {
 | 
			
		||||
                            Queue queue = queueConfigToQueue(queueSettings);
 | 
			
		||||
                            Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName());
 | 
			
		||||
                            if (existing == null) {
 | 
			
		||||
                                queueService.saveQueue(queue);
 | 
			
		||||
                            }
 | 
			
		||||
                        });
 | 
			
		||||
                    } else {
 | 
			
		||||
                        systemDataLoaderService.createQueues();
 | 
			
		||||
                    }
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                }
 | 
			
		||||
                tenantsProfileQueueConfigurationUpdater.updateEntities(null);
 | 
			
		||||
                checkPointRuleNodesUpdater.updateEntities(null);
 | 
			
		||||
                break;
 | 
			
		||||
            default:
 | 
			
		||||
                throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
 | 
			
		||||
        }
 | 
			
		||||
@ -547,4 +586,133 @@ public class DefaultDataUpdateService implements DataUpdateService {
 | 
			
		||||
        log.warn("CAUTION: Update of Oauth2 parameters from 3.2.2 to 3.3.0 available only in ThingsBoard versions 3.3.0/3.3.1");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private final PaginatedUpdater<String, TenantProfile> tenantsProfileQueueConfigurationUpdater =
 | 
			
		||||
            new PaginatedUpdater<>() {
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected String getName() {
 | 
			
		||||
                    return "Tenant profiles queue configuration updater";
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected boolean forceReportTotal() {
 | 
			
		||||
                    return true;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected PageData<TenantProfile> findEntities(String id, PageLink pageLink) {
 | 
			
		||||
                    return tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, pageLink);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected void updateEntity(TenantProfile tenantProfile) {
 | 
			
		||||
                    updateTenantProfileQueueConfiguration(tenantProfile);
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
    private void updateTenantProfileQueueConfiguration(TenantProfile profile) {
 | 
			
		||||
        try {
 | 
			
		||||
            List<TenantProfileQueueConfiguration> queueConfiguration = profile.getProfileData().getQueueConfiguration();
 | 
			
		||||
            if (profile.isIsolatedTbRuleEngine() && (queueConfiguration == null || queueConfiguration.isEmpty())) {
 | 
			
		||||
                TenantProfileQueueConfiguration mainQueueConfig = getMainQueueConfiguration();
 | 
			
		||||
                profile.getProfileData().setQueueConfiguration(Collections.singletonList((mainQueueConfig)));
 | 
			
		||||
                tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, profile);
 | 
			
		||||
                List<TenantId> isolatedTenants = tenantService.findTenantIdsByTenantProfileId(profile.getId());
 | 
			
		||||
                isolatedTenants.forEach(tenantId -> {
 | 
			
		||||
                    queueService.saveQueue(new Queue(tenantId, mainQueueConfig));
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Failed to update tenant profile queue configuration name=["+profile.getName()+"], id=["+ profile.getId().getId() +"]", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TenantProfileQueueConfiguration getMainQueueConfiguration() {
 | 
			
		||||
        TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
 | 
			
		||||
        mainQueueConfiguration.setName("Main");
 | 
			
		||||
        mainQueueConfiguration.setTopic("tb_rule_engine.main");
 | 
			
		||||
        mainQueueConfiguration.setPollInterval(25);
 | 
			
		||||
        mainQueueConfiguration.setPartitions(10);
 | 
			
		||||
        mainQueueConfiguration.setConsumerPerPartition(true);
 | 
			
		||||
        mainQueueConfiguration.setPackProcessingTimeout(2000);
 | 
			
		||||
        SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
 | 
			
		||||
        mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
 | 
			
		||||
        mainQueueSubmitStrategy.setBatchSize(1000);
 | 
			
		||||
        mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy);
 | 
			
		||||
        ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
 | 
			
		||||
        mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
 | 
			
		||||
        mainQueueProcessingStrategy.setRetries(3);
 | 
			
		||||
        mainQueueProcessingStrategy.setFailurePercentage(0);
 | 
			
		||||
        mainQueueProcessingStrategy.setPauseBetweenRetries(3);
 | 
			
		||||
        mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
 | 
			
		||||
        mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy);
 | 
			
		||||
        return mainQueueConfiguration;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) {
 | 
			
		||||
        Queue queue = new Queue();
 | 
			
		||||
        queue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
        queue.setName(queueSettings.getName());
 | 
			
		||||
        queue.setTopic(queueSettings.getTopic());
 | 
			
		||||
        queue.setPollInterval(queueSettings.getPollInterval());
 | 
			
		||||
        queue.setPartitions(queueSettings.getPartitions());
 | 
			
		||||
        queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
 | 
			
		||||
        SubmitStrategy submitStrategy = new SubmitStrategy();
 | 
			
		||||
        submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
 | 
			
		||||
        submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
 | 
			
		||||
        queue.setSubmitStrategy(submitStrategy);
 | 
			
		||||
        ProcessingStrategy processingStrategy = new ProcessingStrategy();
 | 
			
		||||
        processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
 | 
			
		||||
        processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
 | 
			
		||||
        processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
 | 
			
		||||
        processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
 | 
			
		||||
        processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
 | 
			
		||||
        queue.setProcessingStrategy(processingStrategy);
 | 
			
		||||
        queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
 | 
			
		||||
        return queue;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private final PaginatedUpdater<String, RuleNode> checkPointRuleNodesUpdater =
 | 
			
		||||
            new PaginatedUpdater<>() {
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected String getName() {
 | 
			
		||||
                    return "Checkpoint rule nodes updater";
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected boolean forceReportTotal() {
 | 
			
		||||
                    return true;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected PageData<RuleNode> findEntities(String id, PageLink pageLink) {
 | 
			
		||||
                    return ruleChainService.findAllRuleNodesByType("org.thingsboard.rule.engine.flow.TbCheckpointNode", pageLink);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                protected void updateEntity(RuleNode ruleNode) {
 | 
			
		||||
                    updateCheckPointRuleNodeConfiguration(ruleNode);
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
    private void updateCheckPointRuleNodeConfiguration(RuleNode node) {
 | 
			
		||||
        try {
 | 
			
		||||
            ObjectNode configuration = (ObjectNode) node.getConfiguration();
 | 
			
		||||
            JsonNode queueNameNode = configuration.remove("queueName");
 | 
			
		||||
            if (queueNameNode != null) {
 | 
			
		||||
                RuleChain ruleChain = this.ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, node.getRuleChainId());
 | 
			
		||||
                TenantId tenantId = ruleChain.getTenantId();
 | 
			
		||||
                Map<String, QueueId> queues =
 | 
			
		||||
                        queueService.findQueuesByTenantId(tenantId).stream().collect(Collectors.toMap(Queue::getName, Queue::getId));
 | 
			
		||||
                String queueName = queueNameNode.asText();
 | 
			
		||||
                QueueId queueId = queues.get(queueName);
 | 
			
		||||
                configuration.put("queueId", queueId != null ? queueId.toString() : "");
 | 
			
		||||
                ruleChainService.saveRuleNode(tenantId, node);
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Failed to update checkpoint rule node configuration name=["+node.getName()+"], id=["+ node.getId().getId() +"]", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -94,5 +94,7 @@ public interface RuleChainService {
 | 
			
		||||
 | 
			
		||||
    List<RuleNode> findRuleNodesByTenantIdAndType(TenantId tenantId, String type);
 | 
			
		||||
 | 
			
		||||
    PageData<RuleNode> findAllRuleNodesByType(String type, PageLink pageLink);
 | 
			
		||||
 | 
			
		||||
    RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -68,8 +68,7 @@ import java.util.Set;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.DataConstants.TENANT;
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.validateId;
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.validateString;
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.*;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by igor on 3/12/18.
 | 
			
		||||
@ -676,6 +675,14 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
 | 
			
		||||
        return ruleNodeDao.findRuleNodesByTenantIdAndType(tenantId, type, "");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<RuleNode> findAllRuleNodesByType(String type, PageLink pageLink) {
 | 
			
		||||
        log.trace("Executing findAllRuleNodesByType, type {}, pageLink {}", type, pageLink);
 | 
			
		||||
        validateString(type, "Incorrect type of the rule node");
 | 
			
		||||
        validatePageLink(pageLink);
 | 
			
		||||
        return ruleNodeDao.findAllRuleNodesByType(type, pageLink);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode) {
 | 
			
		||||
        return ruleNodeDao.save(tenantId, ruleNode);
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,8 @@
 | 
			
		||||
package org.thingsboard.server.dao.rule;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.dao.Dao;
 | 
			
		||||
 | 
			
		||||
@ -27,4 +29,6 @@ import java.util.List;
 | 
			
		||||
public interface RuleNodeDao extends Dao<RuleNode> {
 | 
			
		||||
 | 
			
		||||
    List<RuleNode> findRuleNodesByTenantIdAndType(TenantId tenantId, String type, String search);
 | 
			
		||||
 | 
			
		||||
    PageData<RuleNode> findAllRuleNodesByType(String type, PageLink pageLink);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.RuleNodeEntity;
 | 
			
		||||
@ -27,6 +29,7 @@ import org.thingsboard.server.dao.rule.RuleNodeDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@ -50,4 +53,14 @@ public class JpaRuleNodeDao extends JpaAbstractSearchTextDao<RuleNodeEntity, Rul
 | 
			
		||||
    public List<RuleNode> findRuleNodesByTenantIdAndType(TenantId tenantId, String type, String search) {
 | 
			
		||||
        return DaoUtil.convertDataList(ruleNodeRepository.findRuleNodesByTenantIdAndType(tenantId.getId(), type, search));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<RuleNode> findAllRuleNodesByType(String type, PageLink pageLink) {
 | 
			
		||||
        return DaoUtil.toPageData(ruleNodeRepository
 | 
			
		||||
                .findAllRuleNodesByType(
 | 
			
		||||
                        type,
 | 
			
		||||
                        Objects.toString(pageLink.getTextSearch(), ""),
 | 
			
		||||
                        DaoUtil.toPageable(pageLink)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,8 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.sql.rule;
 | 
			
		||||
 | 
			
		||||
import org.springframework.data.domain.Page;
 | 
			
		||||
import org.springframework.data.domain.Pageable;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.data.jpa.repository.Query;
 | 
			
		||||
import org.springframework.data.repository.query.Param;
 | 
			
		||||
@ -32,4 +34,9 @@ public interface RuleNodeRepository extends JpaRepository<RuleNodeEntity, UUID>
 | 
			
		||||
                                                    @Param("ruleType") String ruleType,
 | 
			
		||||
                                                    @Param("searchText") String searchText);
 | 
			
		||||
 | 
			
		||||
    @Query("SELECT r FROM RuleNodeEntity r WHERE r.type = :ruleType AND LOWER(r.configuration) LIKE LOWER(CONCAT('%', :searchText, '%')) ")
 | 
			
		||||
    Page<RuleNodeEntity> findAllRuleNodesByType(@Param("ruleType") String ruleType,
 | 
			
		||||
                                                @Param("searchText") String searchText,
 | 
			
		||||
                                                Pageable pageable);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user