added tenant profile upgrade

This commit is contained in:
YevhenBondarenko 2022-05-19 13:09:40 +02:00
parent e32e161745
commit b8f2d6ee9c

View File

@ -25,6 +25,7 @@ import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
@ -35,13 +36,16 @@ import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategy;
import org.thingsboard.server.common.data.queue.SubmitStrategyType; import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.service.install.sql.SqlDbHelper; import org.thingsboard.server.service.install.sql.SqlDbHelper;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -55,6 +59,7 @@ import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException; import java.sql.SQLSyntaxErrorException;
import java.sql.SQLWarning; import java.sql.SQLWarning;
import java.sql.Statement; import java.sql.Statement;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -125,6 +130,9 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
@Autowired @Autowired
private RuleChainService ruleChainService; private RuleChainService ruleChainService;
@Autowired
private TenantProfileService tenantProfileService;
@Override @Override
public void upgradeDatabase(String fromVersion) throws Exception { public void upgradeDatabase(String fromVersion) throws Exception {
switch (fromVersion) { switch (fromVersion) {
@ -567,26 +575,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
try { try {
if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { if (!CollectionUtils.isEmpty(queueConfig.getQueues())) {
queueConfig.getQueues().forEach(queueSettings -> { queueConfig.getQueues().forEach(queueSettings -> {
Queue queue = new Queue(); queueService.saveQueue(queueConfigToQueue(queueSettings));
queue.setTenantId(TenantId.SYS_TENANT_ID);
queue.setName(queueSettings.getName());
queue.setTopic(queueSettings.getTopic());
queue.setPollInterval(queueSettings.getPollInterval());
queue.setPartitions(queueSettings.getPartitions());
queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
SubmitStrategy submitStrategy = new SubmitStrategy();
submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
queue.setSubmitStrategy(submitStrategy);
ProcessingStrategy processingStrategy = new ProcessingStrategy();
processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
queue.setProcessingStrategy(processingStrategy);
queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
queueService.saveQueue(queue);
}); });
} else { } else {
systemDataLoaderService.createQueues(); systemDataLoaderService.createQueues();
@ -625,6 +614,32 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
pageLink = pageLink.nextPageLink(); pageLink = pageLink.nextPageLink();
} while (pageData.hasNext()); } while (pageData.hasNext());
log.info("Updating tenant profiles...");
PageLink profilePageLink = new PageLink(100);
PageData<TenantProfile> profilePageData;
do {
profilePageData = tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, profilePageLink);
profilePageData.getData().forEach(profile -> {
try {
List<TenantProfileQueueConfiguration> queueConfiguration = profile.getProfileData().getQueueConfiguration();
if (profile.isIsolatedTbRuleEngine() && (queueConfiguration == null || queueConfiguration.isEmpty())) {
TenantProfileQueueConfiguration mainQueueConfig = getMainQueueConfiguration();
profile.getProfileData().setQueueConfiguration(Collections.singletonList((mainQueueConfig)));
tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, profile);
List<TenantId> isolatedTenants = tenantService.findTenantIdsByTenantProfileId(profile.getId());
isolatedTenants.forEach(tenantId -> {
queueService.saveQueue(new Queue(tenantId, mainQueueConfig));
});
}
} catch (Exception e) {
}
});
profilePageLink = profilePageLink.nextPageLink();
} while (profilePageData.hasNext());
log.info("Updating schema settings..."); log.info("Updating schema settings...");
conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;"); conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;");
log.info("Schema updated."); log.info("Schema updated.");
@ -677,4 +692,49 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
} }
return isOldSchema; return isOldSchema;
} }
private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) {
Queue queue = new Queue();
queue.setTenantId(TenantId.SYS_TENANT_ID);
queue.setName(queueSettings.getName());
queue.setTopic(queueSettings.getTopic());
queue.setPollInterval(queueSettings.getPollInterval());
queue.setPartitions(queueSettings.getPartitions());
queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
SubmitStrategy submitStrategy = new SubmitStrategy();
submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
queue.setSubmitStrategy(submitStrategy);
ProcessingStrategy processingStrategy = new ProcessingStrategy();
processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
queue.setProcessingStrategy(processingStrategy);
queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
return queue;
}
private TenantProfileQueueConfiguration getMainQueueConfiguration() {
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);
mainQueueConfiguration.setPackProcessingTimeout(2000);
SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
mainQueueSubmitStrategy.setBatchSize(1000);
mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy);
ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
mainQueueProcessingStrategy.setRetries(3);
mainQueueProcessingStrategy.setFailurePercentage(0);
mainQueueProcessingStrategy.setPauseBetweenRetries(3);
mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy);
return mainQueueConfiguration;
}
} }