Fix batch version load
This commit is contained in:
parent
4c8b3bd7cb
commit
2a4ab29b23
@ -19,13 +19,11 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.ExportableEntity;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.sync.ThrowingRunnable;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportResult;
|
||||
@ -39,19 +37,17 @@ import org.thingsboard.server.service.sync.ie.exporting.impl.BaseEntityExportSer
|
||||
import org.thingsboard.server.service.sync.ie.exporting.impl.DefaultEntityExportService;
|
||||
import org.thingsboard.server.service.sync.ie.importing.EntityImportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@TbCoreComponent
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class DefaultEntitiesExportImportService implements EntitiesExportImportService {
|
||||
|
||||
private final Map<EntityType, EntityExportService<?, ?, ?>> exportServices = new HashMap<>();
|
||||
@ -77,7 +73,6 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
|
||||
return exportService.getExportData(user, entityId, exportSettings);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(SecurityUser user, EntityExportData<E> exportData, EntityImportSettings importSettings,
|
||||
boolean saveReferences, boolean sendEvents) throws ThingsboardException {
|
||||
@ -103,44 +98,6 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
|
||||
return importResult;
|
||||
}
|
||||
|
||||
@Transactional(rollbackFor = Exception.class, timeout = 120)
|
||||
@Override
|
||||
public List<EntityImportResult<?>> importEntities(SecurityUser user, List<EntityExportData<?>> exportDataList, EntityImportSettings importSettings) throws ThingsboardException {
|
||||
exportDataList.sort(getDataComparatorForImport());
|
||||
|
||||
List<EntityImportResult<?>> importResults = new ArrayList<>();
|
||||
|
||||
for (EntityExportData exportData : exportDataList) {
|
||||
EntityImportResult<?> importResult = importEntity(user, exportData, importSettings, false, false);
|
||||
importResults.add(importResult);
|
||||
}
|
||||
|
||||
for (ThrowingRunnable saveReferencesCallback : importResults.stream()
|
||||
.map(EntityImportResult::getSaveReferencesCallback)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList())) {
|
||||
saveReferencesCallback.run();
|
||||
}
|
||||
|
||||
importResults.stream()
|
||||
.map(EntityImportResult::getSendEventsCallback)
|
||||
.filter(Objects::nonNull)
|
||||
.forEach(sendEventsCallback -> {
|
||||
try {
|
||||
sendEventsCallback.run();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to send event for entity", e);
|
||||
}
|
||||
});
|
||||
|
||||
return importResults;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Comparator<EntityExportData<?>> getDataComparatorForImport() {
|
||||
return Comparator.comparing(EntityExportData::getEntityType, getEntityTypeComparatorForImport());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<EntityType> getEntityTypeComparatorForImport() {
|
||||
|
||||
@ -19,14 +19,13 @@ import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.ExportableEntity;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportResult;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
public interface EntitiesExportImportService {
|
||||
|
||||
@ -35,10 +34,6 @@ public interface EntitiesExportImportService {
|
||||
<E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(SecurityUser user, EntityExportData<E> exportData, EntityImportSettings importSettings,
|
||||
boolean saveReferences, boolean sendEvents) throws ThingsboardException;
|
||||
|
||||
List<EntityImportResult<?>> importEntities(SecurityUser user, List<EntityExportData<?>> exportDataList, EntityImportSettings importSettings) throws ThingsboardException;
|
||||
|
||||
|
||||
Comparator<EntityExportData<?>> getDataComparatorForImport();
|
||||
|
||||
Comparator<EntityType> getEntityTypeComparatorForImport();
|
||||
|
||||
|
||||
@ -87,19 +87,23 @@ public class DefaultExportableEntitiesService implements ExportableEntitiesServi
|
||||
|
||||
@Override
|
||||
public <E extends HasId<I>, I extends EntityId> E findEntityByTenantIdAndId(TenantId tenantId, I id) {
|
||||
E entity = findEntityById(id);
|
||||
|
||||
if (entity == null || !belongsToTenant(entity, tenantId)) {
|
||||
return null;
|
||||
}
|
||||
return entity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E extends HasId<I>, I extends EntityId> E findEntityById(I id) {
|
||||
EntityType entityType = id.getEntityType();
|
||||
Dao<E> dao = getDao(entityType);
|
||||
if (dao == null) {
|
||||
throw new IllegalArgumentException("Unsupported entity type " + entityType);
|
||||
}
|
||||
|
||||
E entity = dao.findById(tenantId, id.getId());
|
||||
|
||||
if (entity == null || !belongsToTenant(entity, tenantId)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return entity;
|
||||
return dao.findById(TenantId.SYS_TENANT_ID, id.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -32,6 +32,8 @@ public interface ExportableEntitiesService {
|
||||
|
||||
<E extends HasId<I>, I extends EntityId> E findEntityByTenantIdAndId(TenantId tenantId, I id);
|
||||
|
||||
<E extends HasId<I>, I extends EntityId> E findEntityById(I id);
|
||||
|
||||
<E extends ExportableEntity<I>, I extends EntityId> E findEntityByTenantIdAndName(TenantId tenantId, EntityType entityType, String name);
|
||||
|
||||
<E extends ExportableEntity<I>, I extends EntityId> PageData<E> findEntitiesByTenantId(TenantId tenantId, EntityType entityType, PageLink pageLink);
|
||||
|
||||
@ -83,7 +83,8 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
|
||||
entity.setExternalId(entity.getId());
|
||||
|
||||
IdProvider idProvider = new IdProvider(user);
|
||||
EntityImportResult<E> importResult = new EntityImportResult<>();
|
||||
IdProvider idProvider = new IdProvider(user, importSettings, importResult);
|
||||
setOwner(user.getTenantId(), entity, idProvider);
|
||||
if (existingEntity == null) {
|
||||
entity.setId(null);
|
||||
@ -96,7 +97,6 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
|
||||
E savedEntity = prepareAndSave(user.getTenantId(), entity, exportData, idProvider);
|
||||
|
||||
EntityImportResult<E> importResult = new EntityImportResult<>();
|
||||
importResult.setSavedEntity(savedEntity);
|
||||
importResult.setOldEntity(existingEntity);
|
||||
importResult.setEntityType(getEntityType());
|
||||
@ -253,11 +253,27 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
@RequiredArgsConstructor
|
||||
protected class IdProvider {
|
||||
private final SecurityUser user;
|
||||
private final EntityImportSettings importSettings;
|
||||
private final EntityImportResult<E> importResult;
|
||||
|
||||
public <ID extends EntityId> ID getInternalId(ID externalId) {
|
||||
return getInternalId(externalId, true);
|
||||
}
|
||||
|
||||
public <ID extends EntityId> ID getInternalId(ID externalId, boolean throwExceptionIfNotFound) {
|
||||
if (externalId == null || externalId.isNullUid()) return null;
|
||||
|
||||
HasId<ID> entity = findInternalEntity(user.getTenantId(), externalId);
|
||||
HasId<ID> entity;
|
||||
try {
|
||||
entity = findInternalEntity(user.getTenantId(), externalId);
|
||||
} catch (Exception e) {
|
||||
if (throwExceptionIfNotFound) {
|
||||
throw e;
|
||||
} else {
|
||||
importResult.setUpdatedAllExternalIds(false);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
try {
|
||||
exportableEntitiesService.checkPermission(user, entity, entity.getId().getEntityType(), Operation.READ);
|
||||
} catch (ThingsboardException e) {
|
||||
@ -267,6 +283,8 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
}
|
||||
|
||||
public Optional<EntityId> getInternalIdByUuid(UUID externalUuid) {
|
||||
if (externalUuid.equals(EntityId.NULL_UUID)) return Optional.empty();
|
||||
|
||||
for (EntityType entityType : EntityType.values()) {
|
||||
EntityId externalId;
|
||||
try {
|
||||
@ -275,16 +293,19 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
continue;
|
||||
}
|
||||
|
||||
EntityId internalId = null;
|
||||
try {
|
||||
internalId = getInternalId(externalId);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
|
||||
EntityId internalId = getInternalId(externalId, false);
|
||||
if (internalId != null) {
|
||||
return Optional.of(internalId);
|
||||
} else if (importSettings.isResetExternalIdsOfAnotherTenant()) {
|
||||
try {
|
||||
if (exportableEntitiesService.findEntityById(externalId) != null) {
|
||||
return Optional.of(EntityIdFactory.getByTypeAndUuid(entityType, EntityId.NULL_UUID));
|
||||
}
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
}
|
||||
|
||||
importResult.setUpdatedAllExternalIds(false);
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
|
||||
@ -28,12 +28,11 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainUpdateResult;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
|
||||
import org.thingsboard.server.common.data.sync.ie.RuleChainExportData;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
|
||||
import org.thingsboard.server.common.data.sync.ie.RuleChainExportData;
|
||||
import org.thingsboard.server.utils.RegexUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
@ -80,14 +79,13 @@ public class RuleChainImportService extends BaseEntityImportService<RuleChainId,
|
||||
});
|
||||
Optional.ofNullable(metaData.getRuleChainConnections()).orElse(Collections.emptyList())
|
||||
.forEach(ruleChainConnectionInfo -> {
|
||||
ruleChainConnectionInfo.setTargetRuleChainId(idProvider.getInternalId(ruleChainConnectionInfo.getTargetRuleChainId()));
|
||||
ruleChainConnectionInfo.setTargetRuleChainId(idProvider.getInternalId(ruleChainConnectionInfo.getTargetRuleChainId(), false));
|
||||
});
|
||||
ruleChain.setFirstRuleNodeId(null);
|
||||
|
||||
ruleChain = ruleChainService.saveRuleChain(ruleChain);
|
||||
exportData.getMetaData().setRuleChainId(ruleChain.getId());
|
||||
RuleChainUpdateResult updateResult = ruleChainService.saveRuleChainMetaData(tenantId, exportData.getMetaData());
|
||||
// FIXME [viacheslav]: send events for nodes
|
||||
ruleChainService.saveRuleChainMetaData(tenantId, exportData.getMetaData());
|
||||
return ruleChainService.findRuleChainById(tenantId, ruleChain.getId());
|
||||
}
|
||||
|
||||
|
||||
@ -44,10 +44,10 @@ import org.thingsboard.server.common.data.sync.ie.EntityExportData;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportResult;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
|
||||
import org.thingsboard.server.common.data.sync.vc.EntityDataInfo;
|
||||
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
|
||||
import org.thingsboard.server.common.data.sync.vc.EntityDataDiff;
|
||||
import org.thingsboard.server.common.data.sync.vc.EntityDataInfo;
|
||||
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
|
||||
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
|
||||
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
|
||||
import org.thingsboard.server.common.data.sync.vc.VersionLoadResult;
|
||||
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
|
||||
@ -84,7 +84,6 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.google.common.util.concurrent.Futures.transform;
|
||||
import static com.google.common.util.concurrent.Futures.transformAsync;
|
||||
@ -232,6 +231,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
return executor.submit(() -> transactionTemplate.execute(status -> {
|
||||
Map<EntityType, VersionLoadResult> results = new HashMap<>();
|
||||
Map<EntityType, Set<EntityId>> importedEntities = new HashMap<>();
|
||||
Map<EntityId, EntityImportSettings> toReimport = new HashMap<>();
|
||||
List<ThrowingRunnable> saveReferencesCallbacks = new ArrayList<>();
|
||||
List<ThrowingRunnable> sendEventsCallbacks = new ArrayList<>();
|
||||
|
||||
@ -248,21 +248,27 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
List<EntityExportData> entityDataList;
|
||||
do {
|
||||
entityDataList = gitServiceQueue.getEntities(user.getTenantId(), request.getVersionId(), entityType, offset, limit).get();
|
||||
EntityImportSettings importSettings = EntityImportSettings.builder()
|
||||
.updateRelations(config.isLoadRelations())
|
||||
.saveAttributes(config.isLoadAttributes())
|
||||
.findExistingByName(config.isFindExistingEntityByName())
|
||||
.build();
|
||||
for (EntityExportData entityData : entityDataList) {
|
||||
EntityImportResult<?> importResult = exportImportService.importEntity(user, entityData, EntityImportSettings.builder()
|
||||
.updateRelations(config.isLoadRelations())
|
||||
.saveAttributes(config.isLoadAttributes())
|
||||
.findExistingByName(config.isFindExistingEntityByName())
|
||||
.build(), false, false);
|
||||
EntityImportResult<?> importResult = exportImportService.importEntity(user, entityData,
|
||||
importSettings, false, false);
|
||||
if (importResult.getUpdatedAllExternalIds() != null && !importResult.getUpdatedAllExternalIds()) {
|
||||
toReimport.put(entityData.getEntity().getExternalId(), importSettings);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (importResult.getOldEntity() == null) created.incrementAndGet();
|
||||
else updated.incrementAndGet();
|
||||
saveReferencesCallbacks.add(importResult.getSaveReferencesCallback());
|
||||
sendEventsCallbacks.add(importResult.getSendEventsCallback());
|
||||
importedEntities.computeIfAbsent(entityType, t -> new HashSet<>())
|
||||
.add(importResult.getSavedEntity().getId());
|
||||
}
|
||||
offset += limit;
|
||||
importedEntities.computeIfAbsent(entityType, t -> new HashSet<>())
|
||||
.addAll(entityDataList.stream().map(entityData -> entityData.getEntity().getExternalId()).collect(Collectors.toSet()));
|
||||
} while (entityDataList.size() == limit);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -274,6 +280,25 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
.build());
|
||||
});
|
||||
|
||||
toReimport.forEach((externalId, importSettings) -> {
|
||||
try {
|
||||
EntityExportData entityData = gitServiceQueue.getEntity(user.getTenantId(), request.getVersionId(), externalId).get();
|
||||
importSettings.setResetExternalIdsOfAnotherTenant(true);
|
||||
EntityImportResult<?> importResult = exportImportService.importEntity(user, entityData,
|
||||
importSettings, false, false);
|
||||
|
||||
VersionLoadResult stats = results.get(externalId.getEntityType());
|
||||
if (importResult.getOldEntity() == null) stats.setCreated(stats.getCreated() + 1);
|
||||
else stats.setUpdated(stats.getUpdated() + 1);
|
||||
saveReferencesCallbacks.add(importResult.getSaveReferencesCallback());
|
||||
sendEventsCallbacks.add(importResult.getSendEventsCallback());
|
||||
importedEntities.computeIfAbsent(externalId.getEntityType(), t -> new HashSet<>())
|
||||
.add(importResult.getSavedEntity().getId());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
versionLoadRequest.getEntityTypes().keySet().stream()
|
||||
.filter(entityType -> versionLoadRequest.getEntityTypes().get(entityType).isRemoveOtherEntities())
|
||||
.sorted(exportImportService.getEntityTypeComparatorForImport().reversed())
|
||||
@ -281,7 +306,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
DaoUtil.processInBatches(pageLink -> {
|
||||
return exportableEntitiesService.findEntitiesByTenantId(user.getTenantId(), entityType, pageLink);
|
||||
}, 100, entity -> {
|
||||
if (entity.getExternalId() == null || !importedEntities.get(entityType).contains(entity.getExternalId())) {
|
||||
if (!importedEntities.get(entityType).contains(entity.getId())) {
|
||||
try {
|
||||
exportableEntitiesService.checkPermission(user, entity, entityType, Operation.DELETE);
|
||||
} catch (ThingsboardException e) {
|
||||
@ -336,8 +361,8 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
.exportAttributes(otherVersion.getAttributes() != null)
|
||||
.build());
|
||||
return transform(gitServiceQueue.getContentsDiff(user.getTenantId(),
|
||||
JacksonUtil.toPrettyString(currentVersion.sort()),
|
||||
JacksonUtil.toPrettyString(otherVersion.sort())),
|
||||
JacksonUtil.toPrettyString(currentVersion.sort()),
|
||||
JacksonUtil.toPrettyString(otherVersion.sort())),
|
||||
rawDiff -> new EntityDataDiff(currentVersion, otherVersion, rawDiff), MoreExecutors.directExecutor());
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
@ -31,6 +31,8 @@ public class EntityImportResult<E extends ExportableEntity<? extends EntityId>>
|
||||
private ThrowingRunnable saveReferencesCallback = () -> {};
|
||||
private ThrowingRunnable sendEventsCallback = () -> {};
|
||||
|
||||
private Boolean updatedAllExternalIds;
|
||||
|
||||
public void addSaveReferencesCallback(ThrowingRunnable callback) {
|
||||
this.saveReferencesCallback = this.saveReferencesCallback.andThen(callback);
|
||||
}
|
||||
|
||||
@ -28,4 +28,7 @@ public class EntityImportSettings {
|
||||
private boolean findExistingByName;
|
||||
private boolean updateRelations;
|
||||
private boolean saveAttributes;
|
||||
|
||||
// internal
|
||||
private boolean resetExternalIdsOfAnotherTenant;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user