Merge pull request #7560 from YevhenBondarenko/feature/asset-profile-upgrade
[3.4.2] upgrade asset-profiles using ExecutorService
This commit is contained in:
commit
ef1965ed44
@ -37,9 +37,10 @@ CREATE OR REPLACE PROCEDURE update_asset_profiles()
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
BEGIN
|
||||
UPDATE asset as a SET asset_profile_id = p.id
|
||||
FROM
|
||||
(SELECT id, tenant_id, name from asset_profile) as p
|
||||
WHERE a.asset_profile_id IS NULL AND p.tenant_id = a.tenant_id AND a.type = p.name;
|
||||
UPDATE asset a SET asset_profile_id = COALESCE(
|
||||
(SELECT id from asset_profile p WHERE p.tenant_id = a.tenant_id AND a.type = p.name),
|
||||
(SELECT id from asset_profile p WHERE p.tenant_id = a.tenant_id AND p.name = 'default')
|
||||
)
|
||||
WHERE a.asset_profile_id IS NULL;
|
||||
END;
|
||||
$$;
|
||||
|
||||
@ -38,7 +38,6 @@ import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.asset.AssetInfo;
|
||||
import org.thingsboard.server.common.data.asset.AssetSearchQuery;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
@ -86,7 +85,6 @@ import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHO
|
||||
import static org.thingsboard.server.controller.ControllerConstants.TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.UUID_WIKI_LINK;
|
||||
import static org.thingsboard.server.controller.EdgeController.EDGE_ID;
|
||||
import static org.thingsboard.server.dao.asset.BaseAssetService.TB_SERVICE_QUEUE;
|
||||
|
||||
@RestController
|
||||
@TbCoreComponent
|
||||
@ -148,9 +146,6 @@ public class AssetController extends BaseController {
|
||||
@RequestMapping(value = "/asset", method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public Asset saveAsset(@ApiParam(value = "A JSON value representing the asset.") @RequestBody Asset asset) throws Exception {
|
||||
if (TB_SERVICE_QUEUE.equals(asset.getType())) {
|
||||
throw new ThingsboardException("Unable to save asset with type " + TB_SERVICE_QUEUE, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
|
||||
}
|
||||
asset.setTenantId(getTenantId());
|
||||
checkEntity(asset.getId(), asset, Resource.ASSET);
|
||||
return tbAssetService.save(asset, getCurrentUser());
|
||||
|
||||
@ -22,8 +22,10 @@ import org.thingsboard.server.common.data.Customer;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.asset.AssetProfile;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
@ -31,20 +33,32 @@ import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.thingsboard.server.dao.asset.BaseAssetService.TB_SERVICE_QUEUE;
|
||||
|
||||
@Service
|
||||
@AllArgsConstructor
|
||||
public class DefaultTbAssetService extends AbstractTbEntityService implements TbAssetService {
|
||||
|
||||
private final AssetService assetService;
|
||||
private final TbAssetProfileCache assetProfileCache;
|
||||
|
||||
@Override
|
||||
public Asset save(Asset asset, User user) throws Exception {
|
||||
ActionType actionType = asset.getId() == null ? ActionType.ADDED : ActionType.UPDATED;
|
||||
TenantId tenantId = asset.getTenantId();
|
||||
try {
|
||||
if (TB_SERVICE_QUEUE.equals(asset.getType())) {
|
||||
throw new ThingsboardException("Unable to save asset with type " + TB_SERVICE_QUEUE, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
|
||||
} else if (asset.getAssetProfileId() != null) {
|
||||
AssetProfile assetProfile = assetProfileCache.get(tenantId, asset.getAssetProfileId());
|
||||
if (assetProfile != null && TB_SERVICE_QUEUE.equals(assetProfile.getName())) {
|
||||
throw new ThingsboardException("Unable to save asset with profile " + TB_SERVICE_QUEUE, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
|
||||
}
|
||||
}
|
||||
Asset savedAsset = checkNotNull(assetService.saveAsset(asset));
|
||||
autoCommit(user, savedAsset.getId());
|
||||
notificationEntityService.notifyCreateOrUpdateEntity(tenantId, savedAsset.getId(), savedAsset,
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.install;
|
||||
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
|
||||
@Component
|
||||
@Profile("install")
|
||||
public class DbUpgradeExecutorService extends DbCallbackExecutorService {
|
||||
|
||||
}
|
||||
@ -15,6 +15,8 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.install;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -32,14 +34,14 @@ import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategy;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
|
||||
import org.thingsboard.server.dao.asset.AssetDao;
|
||||
import org.thingsboard.server.dao.asset.AssetProfileService;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.dashboard.DashboardService;
|
||||
import org.thingsboard.server.dao.device.DeviceProfileService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
import org.thingsboard.server.dao.queue.QueueService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||
import org.thingsboard.server.dao.sql.tenant.TenantRepository;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
|
||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
|
||||
@ -56,7 +58,9 @@ import java.sql.SQLException;
|
||||
import java.sql.SQLSyntaxErrorException;
|
||||
import java.sql.SQLWarning;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO;
|
||||
@ -106,11 +110,14 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
@Autowired
|
||||
private TenantService tenantService;
|
||||
|
||||
@Autowired
|
||||
private TenantRepository tenantRepository;
|
||||
|
||||
@Autowired
|
||||
private DeviceService deviceService;
|
||||
|
||||
@Autowired
|
||||
private AssetService assetService;
|
||||
private AssetDao assetDao;
|
||||
|
||||
@Autowired
|
||||
private DeviceProfileService deviceProfileService;
|
||||
@ -129,10 +136,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
private TbRuleEngineQueueConfigService queueConfig;
|
||||
|
||||
@Autowired
|
||||
private RuleChainService ruleChainService;
|
||||
|
||||
@Autowired
|
||||
private TenantProfileService tenantProfileService;
|
||||
private DbUpgradeExecutorService dbUpgradeExecutor;
|
||||
|
||||
@Override
|
||||
public void upgradeDatabase(String fromVersion) throws Exception {
|
||||
@ -620,26 +624,45 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.4.1", "schema_update_before.sql");
|
||||
loadSql(schemaUpdateFile, conn);
|
||||
|
||||
conn.createStatement().execute("DELETE FROM asset a WHERE NOT exists(SELECT id FROM tenant WHERE id = a.tenant_id);");
|
||||
|
||||
log.info("Creating default asset profiles...");
|
||||
PageLink pageLink = new PageLink(100);
|
||||
PageData<Tenant> pageData;
|
||||
|
||||
PageLink pageLink = new PageLink(1000);
|
||||
PageData<TenantId> tenantIds;
|
||||
do {
|
||||
pageData = tenantService.findTenants(pageLink);
|
||||
for (Tenant tenant : pageData.getData()) {
|
||||
List<EntitySubtype> assetTypes = assetService.findAssetTypesByTenantId(tenant.getId()).get();
|
||||
try {
|
||||
assetProfileService.createDefaultAssetProfile(tenant.getId());
|
||||
} catch (Exception e) {
|
||||
}
|
||||
for (EntitySubtype assetType : assetTypes) {
|
||||
List<ListenableFuture<?>> futures = new ArrayList<>();
|
||||
tenantIds = tenantService.findTenantsIds(pageLink);
|
||||
for (TenantId tenantId : tenantIds.getData()) {
|
||||
futures.add(dbUpgradeExecutor.submit(() -> {
|
||||
try {
|
||||
assetProfileService.findOrCreateAssetProfile(tenant.getId(), assetType.getType());
|
||||
} catch (Exception e) {
|
||||
}
|
||||
assetProfileService.createDefaultAssetProfile(tenantId);
|
||||
} catch (Exception e) {}
|
||||
}));
|
||||
}
|
||||
Futures.allAsList(futures).get();
|
||||
pageLink = pageLink.nextPageLink();
|
||||
} while (tenantIds.hasNext());
|
||||
|
||||
pageLink = new PageLink(1000);
|
||||
PageData<TbPair<UUID, String>> pairs;
|
||||
do {
|
||||
List<ListenableFuture<?>> futures = new ArrayList<>();
|
||||
pairs = assetDao.getAllAssetTypes(pageLink);
|
||||
for (TbPair<UUID, String> pair : pairs.getData()) {
|
||||
TenantId tenantId = new TenantId(pair.getFirst());
|
||||
String assetType = pair.getSecond();
|
||||
if (!"default".equals(assetType)) {
|
||||
futures.add(dbUpgradeExecutor.submit(() -> {
|
||||
try {
|
||||
assetProfileService.findOrCreateAssetProfile(tenantId, assetType);
|
||||
} catch (Exception e) {}
|
||||
}));
|
||||
}
|
||||
}
|
||||
Futures.allAsList(futures).get();
|
||||
pageLink = pageLink.nextPageLink();
|
||||
} while (pageData.hasNext());
|
||||
} while (pairs.hasNext());
|
||||
|
||||
log.info("Updating asset profiles...");
|
||||
conn.createStatement().execute("call update_asset_profiles()");
|
||||
@ -728,5 +751,4 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.util;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class TbPair<S, T> {
|
||||
private S first;
|
||||
private T second;
|
||||
}
|
||||
@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.Dao;
|
||||
import org.thingsboard.server.dao.ExportableEntityDao;
|
||||
import org.thingsboard.server.dao.TenantEntityDao;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -222,4 +223,6 @@ public interface AssetDao extends Dao<Asset>, TenantEntityDao, ExportableEntityD
|
||||
* @return the list of asset objects
|
||||
*/
|
||||
PageData<Asset> findAssetsByTenantIdAndEdgeIdAndType(UUID tenantId, UUID edgeId, String type, PageLink pageLink);
|
||||
|
||||
PageData<TbPair<UUID, String>> getAllAssetTypes(PageLink pageLink);
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import org.springframework.data.repository.query.Param;
|
||||
import org.thingsboard.server.dao.ExportableEntityRepository;
|
||||
import org.thingsboard.server.dao.model.sql.AssetEntity;
|
||||
import org.thingsboard.server.dao.model.sql.AssetInfoEntity;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@ -70,9 +71,9 @@ public interface AssetRepository extends JpaRepository<AssetEntity, UUID>, Expor
|
||||
"AND a.assetProfileId = :profileId " +
|
||||
"AND LOWER(a.searchText) LIKE LOWER(CONCAT('%', :searchText, '%'))")
|
||||
Page<AssetEntity> findByTenantIdAndProfileId(@Param("tenantId") UUID tenantId,
|
||||
@Param("profileId") UUID profileId,
|
||||
@Param("searchText") String searchText,
|
||||
Pageable pageable);
|
||||
@Param("profileId") UUID profileId,
|
||||
@Param("searchText") String searchText,
|
||||
Pageable pageable);
|
||||
|
||||
@Query("SELECT new org.thingsboard.server.dao.model.sql.AssetInfoEntity(a, c.title, c.additionalInfo, p.name) " +
|
||||
"FROM AssetEntity a " +
|
||||
@ -186,14 +187,17 @@ public interface AssetRepository extends JpaRepository<AssetEntity, UUID>, Expor
|
||||
"AND a.type = :type " +
|
||||
"AND LOWER(a.searchText) LIKE LOWER(CONCAT('%', :searchText, '%'))")
|
||||
Page<AssetEntity> findByTenantIdAndEdgeIdAndType(@Param("tenantId") UUID tenantId,
|
||||
@Param("edgeId") UUID edgeId,
|
||||
@Param("type") String type,
|
||||
@Param("searchText") String searchText,
|
||||
Pageable pageable);
|
||||
@Param("edgeId") UUID edgeId,
|
||||
@Param("type") String type,
|
||||
@Param("searchText") String searchText,
|
||||
Pageable pageable);
|
||||
|
||||
Long countByTenantIdAndTypeIsNot(UUID tenantId, String type);
|
||||
|
||||
@Query("SELECT externalId FROM AssetEntity WHERE id = :id")
|
||||
UUID getExternalIdById(@Param("id") UUID id);
|
||||
|
||||
@Query(value = "SELECT DISTINCT new org.thingsboard.server.common.data.util.TbPair(a.tenantId , a.type) FROM AssetEntity a")
|
||||
Page<TbPair<UUID, String>> getAllAssetTypes(Pageable pageable);
|
||||
|
||||
}
|
||||
|
||||
@ -28,14 +28,17 @@ import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.page.SortOrder;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.asset.AssetDao;
|
||||
import org.thingsboard.server.dao.model.sql.AssetEntity;
|
||||
import org.thingsboard.server.dao.model.sql.AssetInfoEntity;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
@ -243,6 +246,12 @@ public class JpaAssetDao extends JpaAbstractSearchTextDao<AssetEntity, Asset> im
|
||||
DaoUtil.toPageable(pageLink)));
|
||||
}
|
||||
|
||||
public PageData<TbPair<UUID, String>> getAllAssetTypes(PageLink pageLink) {
|
||||
log.debug("Try to find all asset types and pageLink [{}]", pageLink);
|
||||
return DaoUtil.pageToPageData(assetRepository.getAllAssetTypes(
|
||||
DaoUtil.toPageable(pageLink, Arrays.asList(new SortOrder("tenantId"), new SortOrder("type")))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long countByTenantId(TenantId tenantId) {
|
||||
return assetRepository.countByTenantIdAndTypeIsNot(tenantId.getId(), TB_SERVICE_QUEUE);
|
||||
|
||||
@ -53,5 +53,4 @@ public interface TenantRepository extends JpaRepository<TenantEntity, UUID> {
|
||||
|
||||
@Query("SELECT t.id FROM TenantEntity t where t.tenantProfileId = :tenantProfileId")
|
||||
List<UUID> findTenantIdsByTenantProfileId(@Param("tenantProfileId") UUID tenantProfileId);
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user