Send events after transaction finish

This commit is contained in:
Andrii Shvaika 2022-06-14 15:13:27 +03:00
parent 08dea33afc
commit 7c16904c5e
6 changed files with 99 additions and 79 deletions

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.sync.ThrowingRunnable;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
import org.thingsboard.server.common.data.sync.ie.EntityImportResult;
@ -82,8 +83,7 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
}
@Override
public <E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(EntitiesImportCtx ctx, EntityExportData<E> exportData,
boolean saveReferencesAndSendEvents) throws ThingsboardException {
public <E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(EntitiesImportCtx ctx, EntityExportData<E> exportData) throws ThingsboardException {
if (!rateLimitService.checkEntityImportLimit(ctx.getTenantId())) {
throw new ThingsboardException("Rate limit for entities import is exceeded", ThingsboardErrorCode.TOO_MANY_REQUESTS);
}
@ -95,19 +95,19 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
EntityImportService<I, E, EntityExportData<E>> importService = getImportService(entityType);
EntityImportResult<E> importResult = importService.importEntity(ctx, exportData);
if (saveReferencesAndSendEvents) {
importResult.getSaveReferencesCallback().run();
importResult.getSendEventsCallback().run();
saveRelations(ctx);
}
ctx.putInternalId(exportData.getExternalId(), importResult.getSavedEntity().getId());
ctx.addReferenceCallback(importResult.getSaveReferencesCallback());
ctx.addEventCallback(importResult.getSendEventsCallback());
return importResult;
}
@Override
public void saveRelations(EntitiesImportCtx ctx) throws ThingsboardException {
public void saveReferencesAndRelations(EntitiesImportCtx ctx) throws ThingsboardException {
for (ThrowingRunnable saveReferencesCallback : ctx.getReferenceCallbacks()) {
saveReferencesCallback.run();
}
relationService.saveRelations(ctx.getTenantId(), new ArrayList<>(ctx.getRelations()));
for (EntityRelation relation : ctx.getRelations()) {

View File

@ -32,11 +32,10 @@ public interface EntitiesExportImportService {
<E extends ExportableEntity<I>, I extends EntityId> EntityExportData<E> exportEntity(SecurityUser user, I entityId, EntityExportSettings exportSettings) throws ThingsboardException;
<E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(EntitiesImportCtx ctx, EntityExportData<E> exportData,
boolean saveReferencesAndSendEvents) throws ThingsboardException;
<E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(EntitiesImportCtx ctx, EntityExportData<E> exportData) throws ThingsboardException;
void saveRelations(EntitiesImportCtx ctx) throws ThingsboardException;
void saveReferencesAndRelations(EntitiesImportCtx ctx) throws ThingsboardException;
Comparator<EntityType> getEntityTypeComparatorForImport();

View File

@ -57,11 +57,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@ -171,10 +169,8 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
entityActionService.logEntityAction(ctx.getUser(), existingRelation.getTo(), null, null,
ActionType.RELATION_DELETED, null, existingRelation);
});
} else {
if (Objects.equal(relation.getAdditionalInfo(), existingRelation.getAdditionalInfo())) {
relationsMap.remove(relation);
}
} else if (Objects.equal(relation.getAdditionalInfo(), existingRelation.getAdditionalInfo())) {
relationsMap.remove(relation);
}
}
}

View File

@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.thingsboard.common.util.JacksonUtil;
@ -32,6 +33,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
@ -97,6 +99,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.util.concurrent.Futures.transform;
@ -114,7 +118,6 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
private final EntitiesExportImportService exportImportService;
private final ExportableEntitiesService exportableEntitiesService;
private final TbNotificationEntityService entityNotificationService;
private final RelationService relationService;
private final TransactionTemplate transactionTemplate;
private ListeningExecutorService executor;
@ -228,34 +231,45 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
SingleEntityVersionLoadRequest versionLoadRequest = (SingleEntityVersionLoadRequest) request;
VersionLoadConfig config = versionLoadRequest.getConfig();
ListenableFuture<EntityExportData> future = gitServiceQueue.getEntity(user.getTenantId(), request.getVersionId(), versionLoadRequest.getExternalEntityId());
return Futures.transform(future, entityData -> doInTemplate(status -> loadSingleEntity(user, config, entityData)), executor);
return Futures.transform(future, entityData -> doInTemplate(user, ctx -> loadSingleEntity(ctx, config, entityData)), executor);
}
case ENTITY_TYPE: {
EntityTypeVersionLoadRequest versionLoadRequest = (EntityTypeVersionLoadRequest) request;
return executor.submit(() -> doInTemplate(status -> loadMultipleEntities(user, versionLoadRequest)));
return executor.submit(() -> doInTemplate(user, ctx -> loadMultipleEntities(ctx, versionLoadRequest)));
}
default:
throw new IllegalArgumentException("Unsupported version load request");
}
}
private VersionLoadResult doInTemplate(TransactionCallback<VersionLoadResult> result) {
private <R> VersionLoadResult doInTemplate(SecurityUser user, Function<EntitiesImportCtx, VersionLoadResult> function) {
try {
return transactionTemplate.execute(result);
EntitiesImportCtx ctx = new EntitiesImportCtx(user);
VersionLoadResult result = transactionTemplate.execute(status -> function.apply(ctx));
try {
for (ThrowingRunnable throwingRunnable : ctx.getEventCallbacks()) {
throwingRunnable.run();
}
} catch (ThingsboardException e) {
throw new RuntimeException(e);
}
return result;
} catch (LoadEntityException e) {
return onError(e.getData(), e.getCause());
}
}
private VersionLoadResult loadSingleEntity(SecurityUser user, VersionLoadConfig config, EntityExportData entityData) {
private VersionLoadResult loadSingleEntity(EntitiesImportCtx ctx, VersionLoadConfig config, EntityExportData entityData) {
try {
var ctx = new EntitiesImportCtx(user, EntityImportSettings.builder()
ctx.setSettings(EntityImportSettings.builder()
.updateRelations(config.isLoadRelations())
.saveAttributes(config.isLoadAttributes())
.saveCredentials(config.isLoadCredentials())
.findExistingByName(false)
.build());
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData, true);
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData);
exportImportService.saveReferencesAndRelations(ctx);
return VersionLoadResult.success(EntityTypeLoadResult.builder()
.entityType(importResult.getEntityType())
@ -268,29 +282,19 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
}
}
private VersionLoadResult loadMultipleEntities(SecurityUser user, EntityTypeVersionLoadRequest request) {
Map<EntityType, EntityTypeLoadResult> results = new HashMap<>();
Map<EntityType, Set<EntityId>> importedEntities = new HashMap<>();
Map<EntityId, EntityImportSettings> toReimport = new HashMap<>();
List<ThrowingRunnable> saveReferencesCallbacks = new ArrayList<>();
List<ThrowingRunnable> sendEventsCallbacks = new ArrayList<>();
EntitiesImportCtx ctx = new EntitiesImportCtx(user);
private VersionLoadResult loadMultipleEntities(EntitiesImportCtx ctx, EntityTypeVersionLoadRequest request) {
var sw = TbStopWatch.create("before");
List<EntityType> entityTypes = request.getEntityTypes().keySet().stream()
.sorted(exportImportService.getEntityTypeComparatorForImport()).collect(Collectors.toList());
for (EntityType entityType : entityTypes) {
EntityTypeVersionLoadConfig config = request.getEntityTypes().get(entityType);
AtomicInteger created = new AtomicInteger();
AtomicInteger updated = new AtomicInteger();
int limit = 100;
int offset = 0;
List<EntityExportData> entityDataList;
do {
try {
entityDataList = gitServiceQueue.getEntities(user.getTenantId(), request.getVersionId(), entityType, offset, limit).get();
entityDataList = gitServiceQueue.getEntities(ctx.getTenantId(), request.getVersionId(), entityType, offset, limit).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@ -304,46 +308,33 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
log.debug("[{}] Loading {} entities", ctx.getTenantId(), entityType);
EntityImportResult<?> importResult;
try {
importResult = exportImportService.importEntity(ctx, entityData, false);
importResult = exportImportService.importEntity(ctx, entityData);
} catch (Exception e) {
throw new LoadEntityException(entityData, e);
}
if (importResult.getUpdatedAllExternalIds() != null && !importResult.getUpdatedAllExternalIds()) {
toReimport.put(entityData.getEntity().getExternalId(), ctx.getSettings());
ctx.getToReimport().put(entityData.getEntity().getExternalId(), ctx.getSettings());
continue;
}
if (importResult.getOldEntity() == null) created.incrementAndGet();
else updated.incrementAndGet();
saveReferencesCallbacks.add(importResult.getSaveReferencesCallback());
sendEventsCallbacks.add(importResult.getSendEventsCallback());
importedEntities.computeIfAbsent(entityType, t -> new HashSet<>())
ctx.registerResult(entityType, importResult.getOldEntity() == null);
ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>())
.add(importResult.getSavedEntity().getId());
}
offset += limit;
} while (entityDataList.size() == limit);
results.put(entityType, EntityTypeLoadResult.builder()
.entityType(entityType)
.created(created.get())
.updated(updated.get())
.build());
}
sw.startNew("Reimport");
toReimport.forEach((externalId, importSettings) -> {
ctx.getToReimport().forEach((externalId, importSettings) -> {
try {
EntityExportData entityData = gitServiceQueue.getEntity(user.getTenantId(), request.getVersionId(), externalId).get();
EntityExportData entityData = gitServiceQueue.getEntity(ctx.getTenantId(), request.getVersionId(), externalId).get();
importSettings.setResetExternalIdsOfAnotherTenant(true);
ctx.setSettings(importSettings);
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData, false);
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData);
EntityTypeLoadResult stats = results.get(externalId.getEntityType());
if (importResult.getOldEntity() == null) stats.setCreated(stats.getCreated() + 1);
else stats.setUpdated(stats.getUpdated() + 1);
saveReferencesCallbacks.add(importResult.getSaveReferencesCallback());
sendEventsCallbacks.add(importResult.getSendEventsCallback());
importedEntities.computeIfAbsent(externalId.getEntityType(), t -> new HashSet<>())
ctx.registerResult(externalId.getEntityType(), importResult.getOldEntity() == null);
ctx.getImportedEntities().computeIfAbsent(externalId.getEntityType(), t -> new HashSet<>())
.add(importResult.getSavedEntity().getId());
} catch (Exception e) {
throw new RuntimeException(e);
@ -356,17 +347,16 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
.sorted(exportImportService.getEntityTypeComparatorForImport().reversed())
.forEach(entityType -> {
DaoUtil.processInBatches(pageLink -> {
return exportableEntitiesService.findEntitiesByTenantId(user.getTenantId(), entityType, pageLink);
return exportableEntitiesService.findEntitiesByTenantId(ctx.getTenantId(), entityType, pageLink);
}, 100, entity -> {
if (importedEntities.get(entityType) == null || !importedEntities.get(entityType).contains(entity.getId())) {
exportableEntitiesService.removeById(user.getTenantId(), entity.getId());
if (ctx.getImportedEntities().get(entityType) == null || !ctx.getImportedEntities().get(entityType).contains(entity.getId())) {
exportableEntitiesService.removeById(ctx.getTenantId(), entity.getId());
sendEventsCallbacks.add(() -> {
entityNotificationService.notifyDeleteEntity(user.getTenantId(), entity.getId(),
entity, null, ActionType.DELETED, null, user);
ctx.addEventCallback(() -> {
entityNotificationService.notifyDeleteEntity(ctx.getTenantId(), entity.getId(),
entity, null, ActionType.DELETED, null, ctx.getUser());
});
EntityTypeLoadResult result = results.get(entityType);
result.setDeleted(result.getDeleted() + 1);
ctx.registerDeleted(entityType);
}
});
});
@ -374,14 +364,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
sw.startNew("Callbacks");
try {
for (ThrowingRunnable saveReferencesCallback : saveReferencesCallbacks) {
saveReferencesCallback.run();
}
for (ThrowingRunnable sendEventsCallback : sendEventsCallbacks) {
sendEventsCallback.run();
}
sw.startNew("Relations");
exportImportService.saveRelations(ctx);
exportImportService.saveReferencesAndRelations(ctx);
} catch (ThingsboardException e) {
throw new RuntimeException(e);
}
@ -391,7 +374,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
log.debug("[{}] Executed: {} in {}ms", ctx.getTenantId(), task.getTaskName(), task.getTimeMillis());
}
log.info("[{}] Total time: {}ms", ctx.getTenantId(), sw.getTotalTimeMillis());
return VersionLoadResult.success(new ArrayList<>(results.values()));
return VersionLoadResult.success(new ArrayList<>(ctx.getResults().values()));
}
private VersionLoadResult onError(EntityExportData<?> entityData, Throwable e) {

View File

@ -17,16 +17,20 @@ package org.thingsboard.server.service.sync.vc.data;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.sync.ThrowingRunnable;
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
import org.thingsboard.server.common.data.sync.vc.EntityTypeLoadResult;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -37,7 +41,14 @@ public class EntitiesImportCtx {
private final SecurityUser user;
private EntityImportSettings settings;
Map<EntityType, EntityTypeLoadResult> results = new HashMap<>();
Map<EntityType, Set<EntityId>> importedEntities = new HashMap<>();
Map<EntityId, EntityImportSettings> toReimport = new HashMap<>();
private final List<ThrowingRunnable> referenceCallbacks = new ArrayList<>();
private final List<ThrowingRunnable> eventCallbacks = new ArrayList<>();
private final Map<EntityId, EntityId> externalToInternalIdMap = new HashMap<>();
private final Set<EntityRelation> relations = new LinkedHashSet<>();
public EntitiesImportCtx(SecurityUser user) {
@ -84,8 +95,35 @@ public class EntitiesImportCtx {
externalToInternalIdMap.put(externalId, internalId);
}
public void registerResult(EntityType entityType, boolean created) {
EntityTypeLoadResult result = results.computeIfAbsent(entityType, EntityTypeLoadResult::new);
if (created) {
result.setCreated(result.getCreated() + 1);
} else {
result.setUpdated(result.getUpdated() + 1);
}
}
public void registerDeleted(EntityType entityType) {
EntityTypeLoadResult result = results.computeIfAbsent(entityType, EntityTypeLoadResult::new);
result.setDeleted(result.getDeleted() + 1);
}
public void addRelations(Collection<EntityRelation> values) {
relations.addAll(values);
}
public void addReferenceCallback(ThrowingRunnable tr) {
if (tr != null) {
referenceCallbacks.add(tr);
}
}
public void addEventCallback(ThrowingRunnable tr) {
if (tr != null) {
eventCallbacks.add(tr);
}
}
}

View File

@ -30,4 +30,8 @@ public class EntityTypeLoadResult {
private int created;
private int updated;
private int deleted;
public EntityTypeLoadResult(EntityType entityType) {
this.entityType = entityType;
}
}