diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java index 57404a958e..4794a659ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.sync.ThrowingRunnable; import org.thingsboard.server.common.data.sync.ie.EntityExportData; import org.thingsboard.server.common.data.sync.ie.EntityExportSettings; import org.thingsboard.server.common.data.sync.ie.EntityImportResult; @@ -82,8 +83,7 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS } @Override - public , I extends EntityId> EntityImportResult importEntity(EntitiesImportCtx ctx, EntityExportData exportData, - boolean saveReferencesAndSendEvents) throws ThingsboardException { + public , I extends EntityId> EntityImportResult importEntity(EntitiesImportCtx ctx, EntityExportData exportData) throws ThingsboardException { if (!rateLimitService.checkEntityImportLimit(ctx.getTenantId())) { throw new ThingsboardException("Rate limit for entities import is exceeded", ThingsboardErrorCode.TOO_MANY_REQUESTS); } @@ -95,19 +95,19 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS EntityImportService> importService = getImportService(entityType); EntityImportResult importResult = importService.importEntity(ctx, exportData); - - if (saveReferencesAndSendEvents) { - importResult.getSaveReferencesCallback().run(); - importResult.getSendEventsCallback().run(); - saveRelations(ctx); - } - ctx.putInternalId(exportData.getExternalId(), importResult.getSavedEntity().getId()); + + ctx.addReferenceCallback(importResult.getSaveReferencesCallback()); + ctx.addEventCallback(importResult.getSendEventsCallback()); return importResult; } @Override - public void saveRelations(EntitiesImportCtx ctx) throws ThingsboardException { + public void saveReferencesAndRelations(EntitiesImportCtx ctx) throws ThingsboardException { + for (ThrowingRunnable saveReferencesCallback : ctx.getReferenceCallbacks()) { + saveReferencesCallback.run(); + } + relationService.saveRelations(ctx.getTenantId(), new ArrayList<>(ctx.getRelations())); for (EntityRelation relation : ctx.getRelations()) { diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/EntitiesExportImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/EntitiesExportImportService.java index f0f41c7cdf..05e554f114 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/EntitiesExportImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/EntitiesExportImportService.java @@ -32,11 +32,10 @@ public interface EntitiesExportImportService { , I extends EntityId> EntityExportData exportEntity(SecurityUser user, I entityId, EntityExportSettings exportSettings) throws ThingsboardException; - , I extends EntityId> EntityImportResult importEntity(EntitiesImportCtx ctx, EntityExportData exportData, - boolean saveReferencesAndSendEvents) throws ThingsboardException; + , I extends EntityId> EntityImportResult importEntity(EntitiesImportCtx ctx, EntityExportData exportData) throws ThingsboardException; - void saveRelations(EntitiesImportCtx ctx) throws ThingsboardException; + void saveReferencesAndRelations(EntitiesImportCtx ctx) throws ThingsboardException; Comparator getEntityTypeComparatorForImport(); diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index 2c1fd77248..df6b770d1f 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -57,11 +57,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -171,10 +169,8 @@ public abstract class BaseEntityImportService future = gitServiceQueue.getEntity(user.getTenantId(), request.getVersionId(), versionLoadRequest.getExternalEntityId()); - return Futures.transform(future, entityData -> doInTemplate(status -> loadSingleEntity(user, config, entityData)), executor); + return Futures.transform(future, entityData -> doInTemplate(user, ctx -> loadSingleEntity(ctx, config, entityData)), executor); } case ENTITY_TYPE: { EntityTypeVersionLoadRequest versionLoadRequest = (EntityTypeVersionLoadRequest) request; - return executor.submit(() -> doInTemplate(status -> loadMultipleEntities(user, versionLoadRequest))); + return executor.submit(() -> doInTemplate(user, ctx -> loadMultipleEntities(ctx, versionLoadRequest))); } default: throw new IllegalArgumentException("Unsupported version load request"); } } - private VersionLoadResult doInTemplate(TransactionCallback result) { + private VersionLoadResult doInTemplate(SecurityUser user, Function function) { try { - return transactionTemplate.execute(result); + EntitiesImportCtx ctx = new EntitiesImportCtx(user); + VersionLoadResult result = transactionTemplate.execute(status -> function.apply(ctx)); + try { + for (ThrowingRunnable throwingRunnable : ctx.getEventCallbacks()) { + throwingRunnable.run(); + } + } catch (ThingsboardException e) { + throw new RuntimeException(e); + } + return result; } catch (LoadEntityException e) { return onError(e.getData(), e.getCause()); } } - private VersionLoadResult loadSingleEntity(SecurityUser user, VersionLoadConfig config, EntityExportData entityData) { + private VersionLoadResult loadSingleEntity(EntitiesImportCtx ctx, VersionLoadConfig config, EntityExportData entityData) { try { - var ctx = new EntitiesImportCtx(user, EntityImportSettings.builder() + ctx.setSettings(EntityImportSettings.builder() .updateRelations(config.isLoadRelations()) .saveAttributes(config.isLoadAttributes()) .saveCredentials(config.isLoadCredentials()) .findExistingByName(false) .build()); - EntityImportResult importResult = exportImportService.importEntity(ctx, entityData, true); + EntityImportResult importResult = exportImportService.importEntity(ctx, entityData); + + exportImportService.saveReferencesAndRelations(ctx); return VersionLoadResult.success(EntityTypeLoadResult.builder() .entityType(importResult.getEntityType()) @@ -268,29 +282,19 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont } } - private VersionLoadResult loadMultipleEntities(SecurityUser user, EntityTypeVersionLoadRequest request) { - Map results = new HashMap<>(); - Map> importedEntities = new HashMap<>(); - Map toReimport = new HashMap<>(); - List saveReferencesCallbacks = new ArrayList<>(); - List sendEventsCallbacks = new ArrayList<>(); - - EntitiesImportCtx ctx = new EntitiesImportCtx(user); + private VersionLoadResult loadMultipleEntities(EntitiesImportCtx ctx, EntityTypeVersionLoadRequest request) { var sw = TbStopWatch.create("before"); List entityTypes = request.getEntityTypes().keySet().stream() .sorted(exportImportService.getEntityTypeComparatorForImport()).collect(Collectors.toList()); for (EntityType entityType : entityTypes) { EntityTypeVersionLoadConfig config = request.getEntityTypes().get(entityType); - AtomicInteger created = new AtomicInteger(); - AtomicInteger updated = new AtomicInteger(); - int limit = 100; int offset = 0; List entityDataList; do { try { - entityDataList = gitServiceQueue.getEntities(user.getTenantId(), request.getVersionId(), entityType, offset, limit).get(); + entityDataList = gitServiceQueue.getEntities(ctx.getTenantId(), request.getVersionId(), entityType, offset, limit).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -304,46 +308,33 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont log.debug("[{}] Loading {} entities", ctx.getTenantId(), entityType); EntityImportResult importResult; try { - importResult = exportImportService.importEntity(ctx, entityData, false); + importResult = exportImportService.importEntity(ctx, entityData); } catch (Exception e) { throw new LoadEntityException(entityData, e); } if (importResult.getUpdatedAllExternalIds() != null && !importResult.getUpdatedAllExternalIds()) { - toReimport.put(entityData.getEntity().getExternalId(), ctx.getSettings()); + ctx.getToReimport().put(entityData.getEntity().getExternalId(), ctx.getSettings()); continue; } - if (importResult.getOldEntity() == null) created.incrementAndGet(); - else updated.incrementAndGet(); - saveReferencesCallbacks.add(importResult.getSaveReferencesCallback()); - sendEventsCallbacks.add(importResult.getSendEventsCallback()); - - importedEntities.computeIfAbsent(entityType, t -> new HashSet<>()) + ctx.registerResult(entityType, importResult.getOldEntity() == null); + ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>()) .add(importResult.getSavedEntity().getId()); } offset += limit; } while (entityDataList.size() == limit); - results.put(entityType, EntityTypeLoadResult.builder() - .entityType(entityType) - .created(created.get()) - .updated(updated.get()) - .build()); } sw.startNew("Reimport"); - toReimport.forEach((externalId, importSettings) -> { + ctx.getToReimport().forEach((externalId, importSettings) -> { try { - EntityExportData entityData = gitServiceQueue.getEntity(user.getTenantId(), request.getVersionId(), externalId).get(); + EntityExportData entityData = gitServiceQueue.getEntity(ctx.getTenantId(), request.getVersionId(), externalId).get(); importSettings.setResetExternalIdsOfAnotherTenant(true); ctx.setSettings(importSettings); - EntityImportResult importResult = exportImportService.importEntity(ctx, entityData, false); + EntityImportResult importResult = exportImportService.importEntity(ctx, entityData); - EntityTypeLoadResult stats = results.get(externalId.getEntityType()); - if (importResult.getOldEntity() == null) stats.setCreated(stats.getCreated() + 1); - else stats.setUpdated(stats.getUpdated() + 1); - saveReferencesCallbacks.add(importResult.getSaveReferencesCallback()); - sendEventsCallbacks.add(importResult.getSendEventsCallback()); - importedEntities.computeIfAbsent(externalId.getEntityType(), t -> new HashSet<>()) + ctx.registerResult(externalId.getEntityType(), importResult.getOldEntity() == null); + ctx.getImportedEntities().computeIfAbsent(externalId.getEntityType(), t -> new HashSet<>()) .add(importResult.getSavedEntity().getId()); } catch (Exception e) { throw new RuntimeException(e); @@ -356,17 +347,16 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont .sorted(exportImportService.getEntityTypeComparatorForImport().reversed()) .forEach(entityType -> { DaoUtil.processInBatches(pageLink -> { - return exportableEntitiesService.findEntitiesByTenantId(user.getTenantId(), entityType, pageLink); + return exportableEntitiesService.findEntitiesByTenantId(ctx.getTenantId(), entityType, pageLink); }, 100, entity -> { - if (importedEntities.get(entityType) == null || !importedEntities.get(entityType).contains(entity.getId())) { - exportableEntitiesService.removeById(user.getTenantId(), entity.getId()); + if (ctx.getImportedEntities().get(entityType) == null || !ctx.getImportedEntities().get(entityType).contains(entity.getId())) { + exportableEntitiesService.removeById(ctx.getTenantId(), entity.getId()); - sendEventsCallbacks.add(() -> { - entityNotificationService.notifyDeleteEntity(user.getTenantId(), entity.getId(), - entity, null, ActionType.DELETED, null, user); + ctx.addEventCallback(() -> { + entityNotificationService.notifyDeleteEntity(ctx.getTenantId(), entity.getId(), + entity, null, ActionType.DELETED, null, ctx.getUser()); }); - EntityTypeLoadResult result = results.get(entityType); - result.setDeleted(result.getDeleted() + 1); + ctx.registerDeleted(entityType); } }); }); @@ -374,14 +364,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont sw.startNew("Callbacks"); try { - for (ThrowingRunnable saveReferencesCallback : saveReferencesCallbacks) { - saveReferencesCallback.run(); - } - for (ThrowingRunnable sendEventsCallback : sendEventsCallbacks) { - sendEventsCallback.run(); - } - sw.startNew("Relations"); - exportImportService.saveRelations(ctx); + exportImportService.saveReferencesAndRelations(ctx); } catch (ThingsboardException e) { throw new RuntimeException(e); } @@ -391,7 +374,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont log.debug("[{}] Executed: {} in {}ms", ctx.getTenantId(), task.getTaskName(), task.getTimeMillis()); } log.info("[{}] Total time: {}ms", ctx.getTenantId(), sw.getTotalTimeMillis()); - return VersionLoadResult.success(new ArrayList<>(results.values())); + return VersionLoadResult.success(new ArrayList<>(ctx.getResults().values())); } private VersionLoadResult onError(EntityExportData entityData, Throwable e) { diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/data/EntitiesImportCtx.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/data/EntitiesImportCtx.java index ec7c9c4fdf..f37e49ceed 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/data/EntitiesImportCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/data/EntitiesImportCtx.java @@ -17,16 +17,20 @@ package org.thingsboard.server.service.sync.vc.data; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.sync.ThrowingRunnable; import org.thingsboard.server.common.data.sync.ie.EntityImportSettings; +import org.thingsboard.server.common.data.sync.vc.EntityTypeLoadResult; import org.thingsboard.server.service.security.model.SecurityUser; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -37,7 +41,14 @@ public class EntitiesImportCtx { private final SecurityUser user; private EntityImportSettings settings; + Map results = new HashMap<>(); + Map> importedEntities = new HashMap<>(); + Map toReimport = new HashMap<>(); + private final List referenceCallbacks = new ArrayList<>(); + private final List eventCallbacks = new ArrayList<>(); + private final Map externalToInternalIdMap = new HashMap<>(); + private final Set relations = new LinkedHashSet<>(); public EntitiesImportCtx(SecurityUser user) { @@ -84,8 +95,35 @@ public class EntitiesImportCtx { externalToInternalIdMap.put(externalId, internalId); } + public void registerResult(EntityType entityType, boolean created) { + EntityTypeLoadResult result = results.computeIfAbsent(entityType, EntityTypeLoadResult::new); + if (created) { + result.setCreated(result.getCreated() + 1); + } else { + result.setUpdated(result.getUpdated() + 1); + } + } + + public void registerDeleted(EntityType entityType) { + EntityTypeLoadResult result = results.computeIfAbsent(entityType, EntityTypeLoadResult::new); + result.setDeleted(result.getDeleted() + 1); + } + public void addRelations(Collection values) { relations.addAll(values); } + public void addReferenceCallback(ThrowingRunnable tr) { + if (tr != null) { + referenceCallbacks.add(tr); + } + } + + public void addEventCallback(ThrowingRunnable tr) { + if (tr != null) { + eventCallbacks.add(tr); + } + } + + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java index cc0a8a642a..84a28d0770 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java @@ -30,4 +30,8 @@ public class EntityTypeLoadResult { private int created; private int updated; private int deleted; + + public EntityTypeLoadResult(EntityType entityType) { + this.entityType = entityType; + } }