Batch Relations save for import performance improvement
This commit is contained in:
parent
43cc99b9aa
commit
1838e147a6
@ -21,15 +21,19 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.ExportableEntity;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportResult;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.action.EntityActionService;
|
||||
import org.thingsboard.server.service.apiusage.RateLimitService;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
import org.thingsboard.server.service.sync.ie.exporting.EntityExportService;
|
||||
@ -38,6 +42,7 @@ import org.thingsboard.server.service.sync.ie.exporting.impl.DefaultEntityExport
|
||||
import org.thingsboard.server.service.sync.ie.importing.EntityImportService;
|
||||
import org.thingsboard.server.service.sync.vc.data.EntitiesImportCtx;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
@ -53,6 +58,8 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
|
||||
private final Map<EntityType, EntityExportService<?, ?, ?>> exportServices = new HashMap<>();
|
||||
private final Map<EntityType, EntityImportService<?, ?, ?>> importServices = new HashMap<>();
|
||||
|
||||
private final EntityActionService entityActionService;
|
||||
private final RelationService relationService;
|
||||
private final RateLimitService rateLimitService;
|
||||
|
||||
protected static final List<EntityType> SUPPORTED_ENTITY_TYPES = List.of(
|
||||
@ -76,7 +83,7 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
|
||||
|
||||
@Override
|
||||
public <E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(EntitiesImportCtx ctx, EntityExportData<E> exportData,
|
||||
boolean saveReferences, boolean sendEvents) throws ThingsboardException {
|
||||
boolean saveReferencesAndSendEvents) throws ThingsboardException {
|
||||
if (!rateLimitService.checkEntityImportLimit(ctx.getTenantId())) {
|
||||
throw new ThingsboardException("Rate limit for entities import is exceeded", ThingsboardErrorCode.TOO_MANY_REQUESTS);
|
||||
}
|
||||
@ -89,17 +96,28 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
|
||||
|
||||
EntityImportResult<E> importResult = importService.importEntity(ctx, exportData);
|
||||
|
||||
if (saveReferences) {
|
||||
if (saveReferencesAndSendEvents) {
|
||||
importResult.getSaveReferencesCallback().run();
|
||||
}
|
||||
if (sendEvents) {
|
||||
importResult.getSendEventsCallback().run();
|
||||
saveRelations(ctx);
|
||||
}
|
||||
|
||||
ctx.putInternalId(exportData.getExternalId(), importResult.getSavedEntity().getId());
|
||||
return importResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveRelations(EntitiesImportCtx ctx) throws ThingsboardException {
|
||||
relationService.saveRelations(ctx.getTenantId(), new ArrayList<>(ctx.getRelations()));
|
||||
|
||||
for (EntityRelation relation : ctx.getRelations()) {
|
||||
entityActionService.logEntityAction(ctx.getUser(), relation.getFrom(), null, null,
|
||||
ActionType.RELATION_ADD_OR_UPDATE, null, relation);
|
||||
entityActionService.logEntityAction(ctx.getUser(), relation.getTo(), null, null,
|
||||
ActionType.RELATION_ADD_OR_UPDATE, null, relation);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Comparator<EntityType> getEntityTypeComparatorForImport() {
|
||||
|
||||
@ -33,9 +33,11 @@ public interface EntitiesExportImportService {
|
||||
<E extends ExportableEntity<I>, I extends EntityId> EntityExportData<E> exportEntity(SecurityUser user, I entityId, EntityExportSettings exportSettings) throws ThingsboardException;
|
||||
|
||||
<E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(EntitiesImportCtx ctx, EntityExportData<E> exportData,
|
||||
boolean saveReferences, boolean sendEvents) throws ThingsboardException;
|
||||
boolean saveReferencesAndSendEvents) throws ThingsboardException;
|
||||
|
||||
|
||||
void saveRelations(EntitiesImportCtx ctx) throws ThingsboardException;
|
||||
|
||||
Comparator<EntityType> getEntityTypeComparatorForImport();
|
||||
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.sync.ie.importing.impl;
|
||||
|
||||
import com.google.api.client.util.Objects;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -55,9 +56,12 @@ import org.thingsboard.server.service.sync.vc.data.EntitiesImportCtx;
|
||||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -110,6 +114,11 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
return importResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityType getEntityType() {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected abstract void setOwner(TenantId tenantId, E entity, IdProvider idProvider);
|
||||
|
||||
protected abstract E prepareAndSave(EntitiesImportCtx ctx, E entity, D exportData, IdProvider idProvider);
|
||||
@ -144,13 +153,17 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
}
|
||||
}
|
||||
|
||||
Map<EntityRelation, EntityRelation> relationsMap = new LinkedHashMap<>();
|
||||
relations.forEach(r -> relationsMap.put(r, r));
|
||||
|
||||
if (importResult.getOldEntity() != null) {
|
||||
List<EntityRelation> existingRelations = new ArrayList<>();
|
||||
existingRelations.addAll(relationService.findByTo(tenantId, entity.getId(), RelationTypeGroup.COMMON));
|
||||
existingRelations.addAll(relationService.findByFrom(tenantId, entity.getId(), RelationTypeGroup.COMMON));
|
||||
|
||||
for (EntityRelation existingRelation : existingRelations) {
|
||||
if (!relations.contains(existingRelation)) {
|
||||
EntityRelation relation = relationsMap.get(existingRelation);
|
||||
if (relation == null) {
|
||||
relationService.deleteRelation(tenantId, existingRelation);
|
||||
importResult.addSendEventsCallback(() -> {
|
||||
entityActionService.logEntityAction(ctx.getUser(), existingRelation.getFrom(), null, null,
|
||||
@ -158,19 +171,15 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
|
||||
entityActionService.logEntityAction(ctx.getUser(), existingRelation.getTo(), null, null,
|
||||
ActionType.RELATION_DELETED, null, existingRelation);
|
||||
});
|
||||
} else {
|
||||
if (Objects.equal(relation.getAdditionalInfo(), existingRelation.getAdditionalInfo())) {
|
||||
relationsMap.remove(relation);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (EntityRelation relation : relations) {
|
||||
relationService.saveRelation(tenantId, relation);
|
||||
importResult.addSendEventsCallback(() -> {
|
||||
entityActionService.logEntityAction(ctx.getUser(), relation.getFrom(), null, null,
|
||||
ActionType.RELATION_ADD_OR_UPDATE, null, relation);
|
||||
entityActionService.logEntityAction(ctx.getUser(), relation.getTo(), null, null,
|
||||
ActionType.RELATION_ADD_OR_UPDATE, null, relation);
|
||||
});
|
||||
}
|
||||
ctx.addRelations(relationsMap.values());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.id.HasId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.sync.ThrowingRunnable;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
|
||||
@ -69,6 +70,7 @@ import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadConfig
|
||||
import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadRequest;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException;
|
||||
import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
@ -112,6 +114,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
private final EntitiesExportImportService exportImportService;
|
||||
private final ExportableEntitiesService exportableEntitiesService;
|
||||
private final TbNotificationEntityService entityNotificationService;
|
||||
private final RelationService relationService;
|
||||
private final TransactionTemplate transactionTemplate;
|
||||
|
||||
private ListeningExecutorService executor;
|
||||
@ -252,7 +255,8 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
.saveCredentials(config.isLoadCredentials())
|
||||
.findExistingByName(false)
|
||||
.build());
|
||||
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData, true, true);
|
||||
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData, true);
|
||||
|
||||
return VersionLoadResult.success(EntityTypeLoadResult.builder()
|
||||
.entityType(importResult.getEntityType())
|
||||
.created(importResult.getOldEntity() == null ? 1 : 0)
|
||||
@ -300,7 +304,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
log.debug("[{}] Loading {} entities", ctx.getTenantId(), entityType);
|
||||
EntityImportResult<?> importResult;
|
||||
try {
|
||||
importResult = exportImportService.importEntity(ctx, entityData, false, false);
|
||||
importResult = exportImportService.importEntity(ctx, entityData, false);
|
||||
} catch (Exception e) {
|
||||
throw new LoadEntityException(entityData, e);
|
||||
}
|
||||
@ -332,7 +336,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
EntityExportData entityData = gitServiceQueue.getEntity(user.getTenantId(), request.getVersionId(), externalId).get();
|
||||
importSettings.setResetExternalIdsOfAnotherTenant(true);
|
||||
ctx.setSettings(importSettings);
|
||||
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData, false, false);
|
||||
EntityImportResult<?> importResult = exportImportService.importEntity(ctx, entityData, false);
|
||||
|
||||
EntityTypeLoadResult stats = results.get(externalId.getEntityType());
|
||||
if (importResult.getOldEntity() == null) stats.setCreated(stats.getCreated() + 1);
|
||||
@ -369,19 +373,17 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
|
||||
|
||||
sw.startNew("Callbacks");
|
||||
|
||||
for (ThrowingRunnable saveReferencesCallback : saveReferencesCallbacks) {
|
||||
try {
|
||||
try {
|
||||
for (ThrowingRunnable saveReferencesCallback : saveReferencesCallbacks) {
|
||||
saveReferencesCallback.run();
|
||||
} catch (ThingsboardException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
for (ThrowingRunnable sendEventsCallback : sendEventsCallbacks) {
|
||||
try {
|
||||
for (ThrowingRunnable sendEventsCallback : sendEventsCallbacks) {
|
||||
sendEventsCallback.run();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to send events for entity", e);
|
||||
}
|
||||
sw.startNew("Relations");
|
||||
exportImportService.saveRelations(ctx);
|
||||
} catch (ThingsboardException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
sw.stop();
|
||||
|
||||
@ -17,13 +17,18 @@ package org.thingsboard.server.service.sync.vc.data;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
@ -33,6 +38,7 @@ public class EntitiesImportCtx {
|
||||
private EntityImportSettings settings;
|
||||
|
||||
private final Map<EntityId, EntityId> externalToInternalIdMap = new HashMap<>();
|
||||
private final Set<EntityRelation> relations = new LinkedHashSet<>();
|
||||
|
||||
public EntitiesImportCtx(SecurityUser user) {
|
||||
this(user, null);
|
||||
@ -77,4 +83,9 @@ public class EntitiesImportCtx {
|
||||
log.debug("[{}][{}] Local cache put: {}", externalId.getEntityType(), externalId.getId(), internalId);
|
||||
externalToInternalIdMap.put(externalId, internalId);
|
||||
}
|
||||
|
||||
public void addRelations(Collection<EntityRelation> values) {
|
||||
relations.addAll(values);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -40,6 +41,8 @@ public interface RelationService {
|
||||
|
||||
boolean saveRelation(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
void saveRelations(TenantId tenantId, List<EntityRelation> relations);
|
||||
|
||||
ListenableFuture<Boolean> saveRelationAsync(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
boolean deleteRelation(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.dao.relation;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
@ -40,12 +41,14 @@ import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.service.ConstraintValidator;
|
||||
import org.thingsboard.server.dao.sql.JpaExecutorService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -119,6 +122,20 @@ public class BaseRelationService implements RelationService {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveRelations(TenantId tenantId, List<EntityRelation> relations) {
|
||||
log.trace("Executing saveRelations [{}]", relations);
|
||||
for (EntityRelation relation : relations) {
|
||||
validate(relation);
|
||||
}
|
||||
for (List<EntityRelation> partition : Lists.partition(relations, 1024)) {
|
||||
relationDao.saveRelations(tenantId, partition);
|
||||
}
|
||||
for (EntityRelation relation : relations) {
|
||||
publishEvictEvent(EntityRelationEvent.from(relation));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Boolean> saveRelationAsync(TenantId tenantId, EntityRelation relation) {
|
||||
log.trace("Executing saveRelationAsync [{}]", relation);
|
||||
|
||||
@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -43,6 +44,8 @@ public interface RelationDao {
|
||||
|
||||
boolean saveRelation(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
void saveRelations(TenantId tenantId, Collection<EntityRelation> relations);
|
||||
|
||||
ListenableFuture<Boolean> saveRelationAsync(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
boolean deleteRelation(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
@ -1,51 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.relation;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.thingsboard.server.dao.model.sql.RelationEntity;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import javax.persistence.Query;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractRelationInsertRepository implements RelationInsertRepository {
|
||||
|
||||
@PersistenceContext
|
||||
protected EntityManager entityManager;
|
||||
|
||||
protected Query getQuery(RelationEntity entity, String query) {
|
||||
Query nativeQuery = entityManager.createNativeQuery(query, RelationEntity.class);
|
||||
if (entity.getAdditionalInfo() == null) {
|
||||
nativeQuery.setParameter("additionalInfo", null);
|
||||
} else {
|
||||
nativeQuery.setParameter("additionalInfo", entity.getAdditionalInfo().toString());
|
||||
}
|
||||
return nativeQuery
|
||||
.setParameter("fromId", entity.getFromId())
|
||||
.setParameter("fromType", entity.getFromType())
|
||||
.setParameter("toId", entity.getToId())
|
||||
.setParameter("toType", entity.getToType())
|
||||
.setParameter("relationTypeGroup", entity.getRelationTypeGroup())
|
||||
.setParameter("relationType", entity.getRelationType());
|
||||
}
|
||||
|
||||
@Modifying
|
||||
protected abstract RelationEntity processSaveOrUpdate(RelationEntity entity);
|
||||
|
||||
}
|
||||
@ -1,63 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.relation;
|
||||
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.dao.model.sql.RelationCompositeKey;
|
||||
import org.thingsboard.server.dao.model.sql.RelationEntity;
|
||||
import org.thingsboard.server.dao.util.HsqlDao;
|
||||
|
||||
import javax.persistence.Query;
|
||||
|
||||
@HsqlDao
|
||||
@Repository
|
||||
@Transactional
|
||||
public class HsqlRelationInsertRepository extends AbstractRelationInsertRepository implements RelationInsertRepository {
|
||||
|
||||
private static final String INSERT_ON_CONFLICT_DO_UPDATE = "MERGE INTO relation USING (VALUES :fromId, :fromType, :toId, :toType, :relationTypeGroup, :relationType, :additionalInfo) R " +
|
||||
"(from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info) " +
|
||||
"ON (relation.from_id = UUID(R.from_id) AND relation.from_type = R.from_type AND relation.relation_type_group = R.relation_type_group AND relation.relation_type = R.relation_type AND relation.to_id = UUID(R.to_id) AND relation.to_type = R.to_type) " +
|
||||
"WHEN MATCHED THEN UPDATE SET relation.additional_info = R.additional_info " +
|
||||
"WHEN NOT MATCHED THEN INSERT (from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info) VALUES (UUID(R.from_id), R.from_type, UUID(R.to_id), R.to_type, R.relation_type_group, R.relation_type, R.additional_info)";
|
||||
|
||||
protected Query getQuery(RelationEntity entity, String query) {
|
||||
Query nativeQuery = entityManager.createNativeQuery(query, RelationEntity.class);
|
||||
if (entity.getAdditionalInfo() == null) {
|
||||
nativeQuery.setParameter("additionalInfo", null);
|
||||
} else {
|
||||
nativeQuery.setParameter("additionalInfo", entity.getAdditionalInfo().toString());
|
||||
}
|
||||
return nativeQuery
|
||||
.setParameter("fromId", entity.getFromId().toString())
|
||||
.setParameter("fromType", entity.getFromType())
|
||||
.setParameter("toId", entity.getToId().toString())
|
||||
.setParameter("toType", entity.getToType())
|
||||
.setParameter("relationTypeGroup", entity.getRelationTypeGroup())
|
||||
.setParameter("relationType", entity.getRelationType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RelationEntity saveOrUpdate(RelationEntity entity) {
|
||||
return processSaveOrUpdate(entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RelationEntity processSaveOrUpdate(RelationEntity entity) {
|
||||
getQuery(entity, INSERT_ON_CONFLICT_DO_UPDATE).executeUpdate();
|
||||
return entityManager.find(RelationEntity.class, new RelationCompositeKey(entity.toData()));
|
||||
}
|
||||
}
|
||||
@ -33,7 +33,9 @@ import org.thingsboard.server.dao.model.sql.RelationEntity;
|
||||
import org.thingsboard.server.dao.relation.RelationDao;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Created by Valerii Sosliuk on 5/29/2017.
|
||||
@ -112,6 +114,12 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
|
||||
return relationInsertRepository.saveOrUpdate(new RelationEntity(relation)) != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveRelations(TenantId tenantId, Collection<EntityRelation> relations) {
|
||||
List<RelationEntity> entities = relations.stream().map(RelationEntity::new).collect(Collectors.toList());
|
||||
relationInsertRepository.saveOrUpdate(entities);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Boolean> saveRelationAsync(TenantId tenantId, EntityRelation relation) {
|
||||
return service.submit(() -> relationInsertRepository.saveOrUpdate(new RelationEntity(relation)) != null);
|
||||
|
||||
@ -1,41 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.relation;
|
||||
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.dao.model.sql.RelationEntity;
|
||||
import org.thingsboard.server.dao.util.PsqlDao;
|
||||
|
||||
@PsqlDao
|
||||
@Repository
|
||||
@Transactional
|
||||
public class PsqlRelationInsertRepository extends AbstractRelationInsertRepository implements RelationInsertRepository {
|
||||
|
||||
private static final String INSERT_ON_CONFLICT_DO_UPDATE = "INSERT INTO relation (from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info)" +
|
||||
" VALUES (:fromId, :fromType, :toId, :toType, :relationTypeGroup, :relationType, :additionalInfo) " +
|
||||
"ON CONFLICT (from_id, from_type, relation_type_group, relation_type, to_id, to_type) DO UPDATE SET additional_info = :additionalInfo returning *";
|
||||
|
||||
@Override
|
||||
public RelationEntity saveOrUpdate(RelationEntity entity) {
|
||||
return processSaveOrUpdate(entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RelationEntity processSaveOrUpdate(RelationEntity entity) {
|
||||
return (RelationEntity) getQuery(entity, INSERT_ON_CONFLICT_DO_UPDATE).getSingleResult();
|
||||
}
|
||||
}
|
||||
@ -17,8 +17,12 @@ package org.thingsboard.server.dao.sql.relation;
|
||||
|
||||
import org.thingsboard.server.dao.model.sql.RelationEntity;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface RelationInsertRepository {
|
||||
|
||||
RelationEntity saveOrUpdate(RelationEntity entity);
|
||||
|
||||
void saveOrUpdate(List<RelationEntity> entities);
|
||||
|
||||
}
|
||||
@ -0,0 +1,118 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.relation;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.dao.model.sql.RelationEntity;
|
||||
import org.thingsboard.server.dao.util.PsqlDao;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import javax.persistence.Query;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Types;
|
||||
import java.util.List;
|
||||
|
||||
@PsqlDao
|
||||
@Repository
|
||||
@Transactional
|
||||
public class SqlRelationInsertRepository implements RelationInsertRepository {
|
||||
|
||||
private static final String INSERT_ON_CONFLICT_DO_UPDATE_JPA = "INSERT INTO relation (from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info)" +
|
||||
" VALUES (:fromId, :fromType, :toId, :toType, :relationTypeGroup, :relationType, :additionalInfo) " +
|
||||
"ON CONFLICT (from_id, from_type, relation_type_group, relation_type, to_id, to_type) DO UPDATE SET additional_info = :additionalInfo returning *";
|
||||
|
||||
private static final String INSERT_ON_CONFLICT_DO_UPDATE_JDBC = "INSERT INTO relation (from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info)" +
|
||||
" VALUES (?, ?, ?, ?, ?, ?, ?) " +
|
||||
"ON CONFLICT (from_id, from_type, relation_type_group, relation_type, to_id, to_type) DO UPDATE SET additional_info = ?";
|
||||
|
||||
|
||||
@PersistenceContext
|
||||
protected EntityManager entityManager;
|
||||
|
||||
@Autowired
|
||||
protected JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Autowired
|
||||
private TransactionTemplate transactionTemplate;
|
||||
|
||||
protected Query getQuery(RelationEntity entity, String query) {
|
||||
Query nativeQuery = entityManager.createNativeQuery(query, RelationEntity.class);
|
||||
if (entity.getAdditionalInfo() == null) {
|
||||
nativeQuery.setParameter("additionalInfo", null);
|
||||
} else {
|
||||
nativeQuery.setParameter("additionalInfo", JacksonUtil.toString(entity.getAdditionalInfo()));
|
||||
}
|
||||
return nativeQuery
|
||||
.setParameter("fromId", entity.getFromId())
|
||||
.setParameter("fromType", entity.getFromType())
|
||||
.setParameter("toId", entity.getToId())
|
||||
.setParameter("toType", entity.getToType())
|
||||
.setParameter("relationTypeGroup", entity.getRelationTypeGroup())
|
||||
.setParameter("relationType", entity.getRelationType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RelationEntity saveOrUpdate(RelationEntity entity) {
|
||||
return (RelationEntity) getQuery(entity, INSERT_ON_CONFLICT_DO_UPDATE_JPA).getSingleResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveOrUpdate(List<RelationEntity> entities) {
|
||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
@Override
|
||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
jdbcTemplate.batchUpdate(INSERT_ON_CONFLICT_DO_UPDATE_JDBC, new BatchPreparedStatementSetter() {
|
||||
@Override
|
||||
public void setValues(PreparedStatement ps, int i) throws SQLException {
|
||||
RelationEntity relation = entities.get(i);
|
||||
ps.setObject(1, relation.getFromId());
|
||||
ps.setString(2, relation.getFromType());
|
||||
ps.setObject(3, relation.getToId());
|
||||
ps.setString(4, relation.getToType());
|
||||
|
||||
ps.setString(5, relation.getRelationTypeGroup());
|
||||
ps.setString(6, relation.getRelationType());
|
||||
|
||||
if (relation.getAdditionalInfo() == null) {
|
||||
ps.setString(7, null);
|
||||
ps.setString(8, null);
|
||||
} else {
|
||||
String json = JacksonUtil.toString(relation.getAdditionalInfo());
|
||||
ps.setString(7, json);
|
||||
ps.setString(8, json);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBatchSize() {
|
||||
return entities.size();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user