upgrade asset-profiles using ExecutorService

This commit is contained in:
YevhenBondarenko 2022-11-07 11:29:27 +01:00
parent ef82c00c49
commit 9b54f009e6
2 changed files with 43 additions and 13 deletions

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.install;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
@Component
@Profile("install")
public class DbUpgradeExecutorService extends DbCallbackExecutorService {
}

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.server.service.install; package org.thingsboard.server.service.install;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -33,13 +35,11 @@ import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategy;
import org.thingsboard.server.common.data.queue.SubmitStrategyType; import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.sql.asset.AssetRepository;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
@ -56,7 +56,10 @@ import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException; import java.sql.SQLSyntaxErrorException;
import java.sql.SQLWarning; import java.sql.SQLWarning;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO; import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO;
@ -110,7 +113,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
private DeviceService deviceService; private DeviceService deviceService;
@Autowired @Autowired
private AssetService assetService; private AssetRepository assetRepository;
@Autowired @Autowired
private DeviceProfileService deviceProfileService; private DeviceProfileService deviceProfileService;
@ -129,10 +132,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
private TbRuleEngineQueueConfigService queueConfig; private TbRuleEngineQueueConfigService queueConfig;
@Autowired @Autowired
private RuleChainService ruleChainService; private DbUpgradeExecutorService dbUpgradeExecutor;
@Autowired
private TenantProfileService tenantProfileService;
@Override @Override
public void upgradeDatabase(String fromVersion) throws Exception { public void upgradeDatabase(String fromVersion) throws Exception {
@ -623,17 +623,20 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
log.info("Creating default asset profiles..."); log.info("Creating default asset profiles...");
PageLink pageLink = new PageLink(100); PageLink pageLink = new PageLink(100);
PageData<Tenant> pageData; PageData<Tenant> pageData;
List<ListenableFuture<?>> futures = new ArrayList<>();
do { do {
pageData = tenantService.findTenants(pageLink); pageData = tenantService.findTenants(pageLink);
for (Tenant tenant : pageData.getData()) { for (Tenant tenant : pageData.getData()) {
List<EntitySubtype> assetTypes = assetService.findAssetTypesByTenantId(tenant.getId()).get(); Set<String> assetTypes = new HashSet<>(assetRepository.findTenantAssetTypes(tenant.getUuidId()));
assetTypes.remove("default");
try { try {
assetProfileService.createDefaultAssetProfile(tenant.getId()); futures.add(dbUpgradeExecutor.submit(() -> assetProfileService.createDefaultAssetProfile(tenant.getId())));
} catch (Exception e) { } catch (Exception e) {
} }
for (EntitySubtype assetType : assetTypes) { for (String assetType : assetTypes) {
try { try {
assetProfileService.findOrCreateAssetProfile(tenant.getId(), assetType.getType()); futures.add(dbUpgradeExecutor.submit(() -> assetProfileService.findOrCreateAssetProfile(tenant.getId(), assetType)));
} catch (Exception e) { } catch (Exception e) {
} }
} }
@ -641,6 +644,8 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
pageLink = pageLink.nextPageLink(); pageLink = pageLink.nextPageLink();
} while (pageData.hasNext()); } while (pageData.hasNext());
Futures.allAsList(futures).get();
log.info("Updating asset profiles..."); log.info("Updating asset profiles...");
conn.createStatement().execute("call update_asset_profiles()"); conn.createStatement().execute("call update_asset_profiles()");
@ -728,5 +733,4 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
return queue; return queue;
} }
} }