Merge remote-tracking branch 'origin/feature/entities-version-control' into feature/vc-microservice

This commit is contained in:
Andrii Shvaika 2022-05-19 11:20:44 +03:00
commit a75e02dd79
5 changed files with 93 additions and 111 deletions

View File

@ -95,7 +95,7 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
@Transactional(rollbackFor = Exception.class, timeout = 120)
@Override
public List<EntityImportResult<?>> importEntities(SecurityUser user, List<EntityExportData<?>> exportDataList, EntityImportSettings importSettings) throws ThingsboardException {
fixDataOrderForImport(exportDataList);
exportDataList.sort(getDataComparatorForImport());
List<EntityImportResult<?>> importResults = new ArrayList<>();
@ -125,9 +125,15 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
return importResults;
}
@Override
public void fixDataOrderForImport(List<EntityExportData<?>> exportDataList) {
exportDataList.sort(Comparator.comparing(exportData -> SUPPORTED_ENTITY_TYPES.indexOf(exportData.getEntityType())));
public Comparator<EntityExportData<?>> getDataComparatorForImport() {
return Comparator.comparing(EntityExportData::getEntityType, getEntityTypeComparatorForImport());
}
@Override
public Comparator<EntityType> getEntityTypeComparatorForImport() {
return Comparator.comparing(SUPPORTED_ENTITY_TYPES::indexOf);
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.sync.ie;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
import org.thingsboard.server.common.data.sync.ie.EntityImportResult;
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
import java.util.Comparator;
import java.util.List;
public interface EntitiesExportImportService {
@ -35,6 +37,9 @@ public interface EntitiesExportImportService {
List<EntityImportResult<?>> importEntities(SecurityUser user, List<EntityExportData<?>> exportDataList, EntityImportSettings importSettings) throws ThingsboardException;
void fixDataOrderForImport(List<EntityExportData<?>> exportDataList);
Comparator<EntityExportData<?>> getDataComparatorForImport();
Comparator<EntityType> getEntityTypeComparatorForImport();
}

View File

@ -15,18 +15,10 @@
*/
package org.thingsboard.server.service.sync.vc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.jgit.api.errors.JGitInternalException;
import org.eclipse.jgit.api.errors.RefAlreadyExistsException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import org.thingsboard.common.util.JacksonUtil;
@ -34,24 +26,15 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityTypeFilter;
import org.thingsboard.server.common.data.sync.vc.request.load.EntityTypeVersionLoadConfig;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.tenant.TenantDao;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
@ -77,27 +60,16 @@ import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadConfig
import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadRequest;
import org.thingsboard.server.common.data.sync.ThrowingRunnable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.sql.query.EntityKeyMapping.CREATED_TIME;
@Service
@TbCoreComponent
@RequiredArgsConstructor
@ -115,7 +87,6 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
@Override
public VersionCreationResult saveEntitiesVersion(SecurityUser user, VersionCreateRequest request) throws Exception {
var commit = gitService.prepareCommit(user.getTenantId(), request);
switch (request.getType()) {
@ -132,23 +103,11 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
}
if (config.isAllEntities()) {
EntityTypeFilter entityTypeFilter = new EntityTypeFilter();
entityTypeFilter.setEntityType(entityType);
EntityDataPageLink entityDataPageLink = new EntityDataPageLink();
entityDataPageLink.setPage(-1);
entityDataPageLink.setPageSize(-1);
EntityKey sortProperty = new EntityKey(EntityKeyType.ENTITY_FIELD, CREATED_TIME);
entityDataPageLink.setSortOrder(new EntityDataSortOrder(sortProperty, EntityDataSortOrder.Direction.DESC));
EntityDataQuery query = new EntityDataQuery(entityTypeFilter, entityDataPageLink, List.of(sortProperty), Collections.emptyList(), Collections.emptyList());
DaoUtil.processInBatches(pageLink -> {
entityDataPageLink.setPage(pageLink.getPage());
entityDataPageLink.setPageSize(pageLink.getPageSize());
return entityService.findEntityDataByQuery(user.getTenantId(), new CustomerId(EntityId.NULL_UUID), query);
}, 100, data -> {
EntityId entityId = data.getEntityId();
return exportableEntitiesService.findEntitiesByTenantId(user.getTenantId(), entityType, pageLink);
}, 100, entity -> {
try {
saveEntityData(user, commit, entityId, config);
saveEntityData(user, commit, entity.getId(), config);
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -209,22 +168,20 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
switch (request.getType()) {
case SINGLE_ENTITY: {
SingleEntityVersionLoadRequest versionLoadRequest = (SingleEntityVersionLoadRequest) request;
VersionLoadConfig config = versionLoadRequest.getConfig();
EntityImportResult<?> importResult = transactionTemplate.execute(status -> {
try {
EntityImportResult<?> result = loadEntity(user, request, versionLoadRequest.getConfig(), versionLoadRequest.getExternalEntityId());
result.getSaveReferencesCallback().run();
return result;
EntityExportData entityData = gitService.getEntity(user.getTenantId(), request.getVersionId(), versionLoadRequest.getExternalEntityId());
return exportImportService.importEntity(user, entityData, EntityImportSettings.builder()
.updateRelations(config.isLoadRelations())
.findExistingByName(config.isFindExistingEntityByName())
.build(), true, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
importResult.getSendEventsCallback().run();
} catch (Exception e) {
log.error("Failed to send events for entity", e);
}
return List.of(VersionLoadResult.builder()
.entityType(importResult.getEntityType())
.created(importResult.getOldEntity() == null ? 1 : 0)
.updated(importResult.getOldEntity() != null ? 1 : 0)
.deleted(0)
@ -234,56 +191,68 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
EntityTypeVersionLoadRequest versionLoadRequest = (EntityTypeVersionLoadRequest) request;
return transactionTemplate.execute(status -> {
Map<EntityType, VersionLoadResult> results = new HashMap<>();
Map<EntityType, Set<EntityId>> importedEntities = new HashMap<>();
List<ThrowingRunnable> saveReferencesCallbacks = new ArrayList<>();
List<ThrowingRunnable> sendEventsCallbacks = new ArrayList<>();
// order entity types..
// or what
versionLoadRequest.getEntityTypes().forEach((entityType, config) -> {
AtomicInteger created = new AtomicInteger();
AtomicInteger updated = new AtomicInteger();
AtomicInteger deleted = new AtomicInteger();
Set<EntityId> remoteEntities;
try {
remoteEntities = listEntitiesAtVersion(user.getTenantId(), request.getBranch(), request.getVersionId(), entityType).stream()
.map(VersionedEntityInfo::getExternalId)
.collect(Collectors.toSet());
for (EntityId externalEntityId : remoteEntities) {
EntityImportResult<?> importResult = loadEntity(user, request, config, externalEntityId);
versionLoadRequest.getEntityTypes().keySet().stream()
.sorted(exportImportService.getEntityTypeComparatorForImport())
.forEach(entityType -> {
EntityTypeVersionLoadConfig config = versionLoadRequest.getEntityTypes().get(entityType);
AtomicInteger created = new AtomicInteger();
AtomicInteger updated = new AtomicInteger();
if (importResult.getOldEntity() == null) created.incrementAndGet();
else updated.incrementAndGet();
try {
int limit = 100;
int offset = 0;
List<EntityExportData<?>> entityDataList;
do {
entityDataList = gitService.getEntities(user.getTenantId(), request.getBranch(), request.getVersionId(), entityType, offset, limit);
for (EntityExportData<?> entityData : entityDataList) {
EntityImportResult<?> importResult = exportImportService.importEntity(user, entityData, EntityImportSettings.builder()
.updateRelations(config.isLoadRelations())
.findExistingByName(config.isFindExistingEntityByName())
.build(), false, false);
saveReferencesCallbacks.add(importResult.getSaveReferencesCallback());
sendEventsCallbacks.add(importResult.getSendEventsCallback());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
if (config.isRemoveOtherEntities()) {
DaoUtil.processInBatches(pageLink -> {
return exportableEntitiesService.findEntitiesByTenantId(user.getTenantId(), entityType, pageLink);
}, 100, entity -> {
if (entity.getExternalId() == null || !remoteEntities.contains(entity.getExternalId())) {
try {
exportableEntitiesService.checkPermission(user, entity, entityType, Operation.DELETE);
} catch (ThingsboardException e) {
throw new RuntimeException(e);
}
// need to delete entity types in a specific order?
exportableEntitiesService.deleteByTenantIdAndId(user.getTenantId(), entity.getId());
deleted.getAndIncrement();
if (importResult.getOldEntity() == null) created.incrementAndGet();
else updated.incrementAndGet();
saveReferencesCallbacks.add(importResult.getSaveReferencesCallback());
sendEventsCallbacks.add(importResult.getSendEventsCallback());
}
offset += limit;
importedEntities.computeIfAbsent(entityType, t -> new HashSet<>())
.addAll(entityDataList.stream().map(entityData -> entityData.getEntity().getId()).collect(Collectors.toSet()));
} while (entityDataList.size() == limit);
} catch (Exception e) {
throw new RuntimeException(e);
}
results.put(entityType, VersionLoadResult.builder()
.entityType(entityType)
.created(created.get())
.updated(updated.get())
.build());
});
}
results.put(entityType, VersionLoadResult.builder()
.created(created.get())
.updated(updated.get())
.deleted(deleted.get())
.build());
});
versionLoadRequest.getEntityTypes().keySet().stream()
.filter(entityType -> versionLoadRequest.getEntityTypes().get(entityType).isRemoveOtherEntities())
.sorted(exportImportService.getEntityTypeComparatorForImport().reversed())
.forEach(entityType -> {
DaoUtil.processInBatches(pageLink -> {
return exportableEntitiesService.findEntitiesByTenantId(user.getTenantId(), entityType, pageLink);
}, 100, entity -> {
if (entity.getExternalId() == null || !importedEntities.get(entityType).contains(entity.getExternalId())) {
try {
exportableEntitiesService.checkPermission(user, entity, entityType, Operation.DELETE);
} catch (ThingsboardException e) {
throw new RuntimeException(e);
}
exportableEntitiesService.deleteByTenantIdAndId(user.getTenantId(), entity.getId());
VersionLoadResult result = results.get(entityType);
result.setDeleted(result.getDeleted() + 1);
}
});
});
for (ThrowingRunnable saveReferencesCallback : saveReferencesCallbacks) {
try {
@ -307,14 +276,6 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
}
}
private EntityImportResult<?> loadEntity(SecurityUser user, VersionLoadRequest request, VersionLoadConfig config, EntityId entityId) throws Exception {
EntityExportData entityData = gitService.getEntity(user.getTenantId(), request.getVersionId(), entityId);
return exportImportService.importEntity(user, entityData, EntityImportSettings.builder()
.updateRelations(config.isLoadRelations())
.findExistingByName(config.isFindExistingEntityByName())
.build(), false, false);
}
@Override
public List<String> listBranches(TenantId tenantId) throws Exception {

View File

@ -52,6 +52,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
@ -193,6 +194,14 @@ public class LocalGitVersionControlService implements GitVersionControlService {
return gitRepositoryService.listBranches(tenantId);
}
@Override
public List<EntityExportData<?>> getEntities(TenantId tenantId, String branch, String versionId, EntityType entityType, int offset, int limit) {
return listEntitiesAtVersion(tenantId, branch, versionId, entityType).stream()
.skip(offset).limit(limit)
.map(entityInfo -> getEntity(tenantId, versionId, entityInfo.getExternalId()))
.collect(Collectors.toList());
}
@Override
public EntityExportData<?> getEntity(TenantId tenantId, String versionId, EntityId entityId) {
try {

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.sync.vc;
import lombok.SneakyThrows;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.id.EntityId;
@ -31,7 +30,6 @@ import java.util.List;
public interface GitVersionControlService {
@SneakyThrows
EntitiesVersionControlSettings getSettings(TenantId tenantId);
void initRepository(TenantId tenantId, EntitiesVersionControlSettings settings);
@ -57,4 +55,7 @@ public interface GitVersionControlService {
List<String> listBranches(TenantId tenantId);
EntityExportData<?> getEntity(TenantId tenantId, String versionId, EntityId entityId);
List<EntityExportData<?>> getEntities(TenantId tenantId, String branch, String versionId, EntityType entityType, int offset, int limit);
}