diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index 473f3accda..db2352b706 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -25,6 +25,7 @@ import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; @@ -35,13 +36,16 @@ import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategyType; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.service.install.sql.SqlDbHelper; import java.nio.charset.Charset; @@ -55,6 +59,7 @@ import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; import java.sql.SQLWarning; import java.sql.Statement; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -125,6 +130,9 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService @Autowired private RuleChainService ruleChainService; + @Autowired + private TenantProfileService tenantProfileService; + @Override public void upgradeDatabase(String fromVersion) throws Exception { switch (fromVersion) { @@ -567,26 +575,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService try { if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { queueConfig.getQueues().forEach(queueSettings -> { - Queue queue = new Queue(); - queue.setTenantId(TenantId.SYS_TENANT_ID); - queue.setName(queueSettings.getName()); - queue.setTopic(queueSettings.getTopic()); - queue.setPollInterval(queueSettings.getPollInterval()); - queue.setPartitions(queueSettings.getPartitions()); - queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); - SubmitStrategy submitStrategy = new SubmitStrategy(); - submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); - submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); - queue.setSubmitStrategy(submitStrategy); - ProcessingStrategy processingStrategy = new ProcessingStrategy(); - processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); - processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); - processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); - processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); - processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); - queue.setProcessingStrategy(processingStrategy); - queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); - queueService.saveQueue(queue); + queueService.saveQueue(queueConfigToQueue(queueSettings)); }); } else { systemDataLoaderService.createQueues(); @@ -625,6 +614,32 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService pageLink = pageLink.nextPageLink(); } while (pageData.hasNext()); + log.info("Updating tenant profiles..."); + PageLink profilePageLink = new PageLink(100); + PageData profilePageData; + do { + profilePageData = tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, profilePageLink); + + profilePageData.getData().forEach(profile -> { + try { + List queueConfiguration = profile.getProfileData().getQueueConfiguration(); + if (profile.isIsolatedTbRuleEngine() && (queueConfiguration == null || queueConfiguration.isEmpty())) { + TenantProfileQueueConfiguration mainQueueConfig = getMainQueueConfiguration(); + profile.getProfileData().setQueueConfiguration(Collections.singletonList((mainQueueConfig))); + tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, profile); + List isolatedTenants = tenantService.findTenantIdsByTenantProfileId(profile.getId()); + isolatedTenants.forEach(tenantId -> { + queueService.saveQueue(new Queue(tenantId, mainQueueConfig)); + }); + } + } catch (Exception e) { + } + + }); + profilePageLink = profilePageLink.nextPageLink(); + } while (profilePageData.hasNext()); + + log.info("Updating schema settings..."); conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;"); log.info("Schema updated."); @@ -677,4 +692,49 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService } return isOldSchema; } + + private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) { + Queue queue = new Queue(); + queue.setTenantId(TenantId.SYS_TENANT_ID); + queue.setName(queueSettings.getName()); + queue.setTopic(queueSettings.getTopic()); + queue.setPollInterval(queueSettings.getPollInterval()); + queue.setPartitions(queueSettings.getPartitions()); + queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); + SubmitStrategy submitStrategy = new SubmitStrategy(); + submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); + submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); + queue.setSubmitStrategy(submitStrategy); + ProcessingStrategy processingStrategy = new ProcessingStrategy(); + processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); + processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); + processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); + processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); + processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); + queue.setProcessingStrategy(processingStrategy); + queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); + return queue; + } + + private TenantProfileQueueConfiguration getMainQueueConfiguration() { + TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration(); + mainQueueConfiguration.setName("Main"); + mainQueueConfiguration.setTopic("tb_rule_engine.main"); + mainQueueConfiguration.setPollInterval(25); + mainQueueConfiguration.setPartitions(10); + mainQueueConfiguration.setConsumerPerPartition(true); + mainQueueConfiguration.setPackProcessingTimeout(2000); + SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy(); + mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST); + mainQueueSubmitStrategy.setBatchSize(1000); + mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy); + ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy(); + mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); + mainQueueProcessingStrategy.setRetries(3); + mainQueueProcessingStrategy.setFailurePercentage(0); + mainQueueProcessingStrategy.setPauseBetweenRetries(3); + mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3); + mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy); + return mainQueueConfiguration; + } }