diff --git a/application/src/main/java/org/thingsboard/server/controller/AssetController.java b/application/src/main/java/org/thingsboard/server/controller/AssetController.java index 94c741c774..3889e6ee43 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AssetController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AssetController.java @@ -133,7 +133,7 @@ public class AssetController extends BaseController { Asset savedAsset = checkNotNull(assetService.saveAsset(asset)); - onAssetCreatedOrUpdated(savedAsset, asset.getId() != null); + onAssetCreatedOrUpdated(savedAsset, asset.getId() != null, getCurrentUser()); return savedAsset; } catch (Exception e) { @@ -143,9 +143,9 @@ public class AssetController extends BaseController { } } - private void onAssetCreatedOrUpdated(Asset asset, boolean updated) { + private void onAssetCreatedOrUpdated(Asset asset, boolean updated, SecurityUser user) { try { - logEntityAction(asset.getId(), asset, + logEntityAction(user, asset.getId(), asset, asset.getCustomerId(), updated ? ActionType.UPDATED : ActionType.ADDED, null); } catch (ThingsboardException e) { @@ -648,8 +648,9 @@ public class AssetController extends BaseController { @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") @PostMapping("/asset/bulk_import") public BulkImportResult processAssetsBulkImport(@RequestBody BulkImportRequest request) throws Exception { - return assetBulkImportService.processBulkImport(request, getCurrentUser(), importedAssetInfo -> { - onAssetCreatedOrUpdated(importedAssetInfo.getEntity(), importedAssetInfo.isUpdated()); + SecurityUser user = getCurrentUser(); + return assetBulkImportService.processBulkImport(request, user, importedAssetInfo -> { + onAssetCreatedOrUpdated(importedAssetInfo.getEntity(), importedAssetInfo.isUpdated(), user); }); } diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index f0991ff901..ef27f5f8fe 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -161,7 +161,7 @@ public class DeviceController extends BaseController { Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken)); - onDeviceCreatedOrUpdated(savedDevice, oldDevice, !created); + onDeviceCreatedOrUpdated(savedDevice, oldDevice, !created, getCurrentUser()); return savedDevice; } catch (Exception e) { @@ -172,11 +172,11 @@ public class DeviceController extends BaseController { } - private void onDeviceCreatedOrUpdated(Device savedDevice, Device oldDevice, boolean updated) { + private void onDeviceCreatedOrUpdated(Device savedDevice, Device oldDevice, boolean updated, SecurityUser user) { tbClusterService.onDeviceUpdated(savedDevice, oldDevice); try { - logEntityAction(savedDevice.getId(), savedDevice, + logEntityAction(user, savedDevice.getId(), savedDevice, savedDevice.getCustomerId(), updated ? ActionType.UPDATED : ActionType.ADDED, null); } catch (ThingsboardException e) { @@ -941,8 +941,9 @@ public class DeviceController extends BaseController { @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") @PostMapping("/device/bulk_import") public BulkImportResult processDevicesBulkImport(@RequestBody BulkImportRequest request) throws Exception { - return deviceBulkImportService.processBulkImport(request, getCurrentUser(), importedDeviceInfo -> { - onDeviceCreatedOrUpdated(importedDeviceInfo.getEntity(), importedDeviceInfo.getOldEntity(), importedDeviceInfo.isUpdated()); + SecurityUser user = getCurrentUser(); + return deviceBulkImportService.processBulkImport(request, user, importedDeviceInfo -> { + onDeviceCreatedOrUpdated(importedDeviceInfo.getEntity(), importedDeviceInfo.getOldEntity(), importedDeviceInfo.isUpdated(), user); }); } diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 21c39cdd58..da0e103ad2 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -140,7 +140,7 @@ public class EdgeController extends BaseController { edge.getId(), edge); Edge savedEdge = checkNotNull(edgeService.saveEdge(edge, true)); - onEdgeCreatedOrUpdated(tenantId, savedEdge, edgeTemplateRootRuleChain, !created); + onEdgeCreatedOrUpdated(tenantId, savedEdge, edgeTemplateRootRuleChain, !created, getCurrentUser()); return savedEdge; } catch (Exception e) { @@ -150,7 +150,7 @@ public class EdgeController extends BaseController { } } - private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated) throws IOException, ThingsboardException { + private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated, SecurityUser user) throws IOException, ThingsboardException { if (!updated) { ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId()); edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId()); @@ -160,7 +160,7 @@ public class EdgeController extends BaseController { tbClusterService.broadcastEntityStateChangeEvent(edge.getTenantId(), edge.getId(), updated ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED); - logEntityAction(edge.getId(), edge, null, updated ? ActionType.UPDATED : ActionType.ADDED, null); + logEntityAction(user, edge.getId(), edge, null, updated ? ActionType.UPDATED : ActionType.ADDED, null); } @PreAuthorize("hasAuthority('TENANT_ADMIN')") @@ -586,7 +586,7 @@ public class EdgeController extends BaseController { return edgeBulkImportService.processBulkImport(request, user, importedAssetInfo -> { try { - onEdgeCreatedOrUpdated(user.getTenantId(), importedAssetInfo.getEntity(), edgeTemplateRootRuleChain, importedAssetInfo.isUpdated()); + onEdgeCreatedOrUpdated(user.getTenantId(), importedAssetInfo.getEntity(), edgeTemplateRootRuleChain, importedAssetInfo.isUpdated(), user); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java index a71fb70e0a..2fcd36f95b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java @@ -25,7 +25,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; -import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; @@ -450,15 +449,17 @@ public class RuleChainController extends BaseController { @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/ruleChains/import", method = RequestMethod.POST) @ResponseBody - public void importRuleChains(@RequestBody RuleChainData ruleChainData, @RequestParam(required = false, defaultValue = "false") boolean overwrite) throws ThingsboardException { + public List importRuleChains(@RequestBody RuleChainData ruleChainData, @RequestParam(required = false, defaultValue = "false") boolean overwrite) throws ThingsboardException { try { TenantId tenantId = getCurrentUser().getTenantId(); - List importResults = ruleChainService.importTenantRuleChains(tenantId, ruleChainData, RuleChainType.CORE, overwrite); - if (!CollectionUtils.isEmpty(importResults)) { - for (RuleChainImportResult importResult : importResults) { - tbClusterService.broadcastEntityStateChangeEvent(importResult.getTenantId(), importResult.getRuleChainId(), importResult.getLifecycleEvent()); + List importResults = ruleChainService.importTenantRuleChains(tenantId, ruleChainData, overwrite); + for (RuleChainImportResult importResult : importResults) { + if (importResult.getError() == null) { + tbClusterService.broadcastEntityStateChangeEvent(importResult.getTenantId(), importResult.getRuleChainId(), + importResult.isUpdated() ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED); } } + return importResults; } catch (Exception e) { throw handleException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java index 82d4dc5571..707674f6ba 100644 --- a/application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java @@ -63,6 +63,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Service @TbCoreComponent @@ -71,6 +73,8 @@ public class DeviceBulkImportService extends AbstractBulkImportService { protected final DeviceCredentialsService deviceCredentialsService; protected final DeviceProfileService deviceProfileService; + private final Lock findOrCreateDeviceProfileLock = new ReentrantLock(); + public DeviceBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, AccessControlService accessControlService, AccessValidator accessValidator, EntityActionService entityActionService, TbClusterService clusterService, @@ -106,9 +110,13 @@ public class DeviceBulkImportService extends AbstractBulkImportService { throw new DeviceCredentialsValidationException("Invalid device credentials: " + e.getMessage()); } + DeviceProfile deviceProfile; if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { - setUpLwM2mDeviceProfile(user.getTenantId(), device); + deviceProfile = setUpLwM2mDeviceProfile(user.getTenantId(), device); + } else { + deviceProfile = deviceProfileService.findOrCreateDeviceProfile(user.getTenantId(), device.getType()); } + device.setDeviceProfileId(deviceProfile.getId()); device = deviceService.saveDeviceWithCredentials(device, deviceCredentials); @@ -215,36 +223,43 @@ public class DeviceBulkImportService extends AbstractBulkImportService { credentials.setCredentialsValue(lwm2mCredentials.toString()); } - private void setUpLwM2mDeviceProfile(TenantId tenantId, Device device) { + private DeviceProfile setUpLwM2mDeviceProfile(TenantId tenantId, Device device) { DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType()); if (deviceProfile != null) { if (deviceProfile.getTransportType() != DeviceTransportType.LWM2M) { deviceProfile.setTransportType(DeviceTransportType.LWM2M); deviceProfile.getProfileData().setTransportConfiguration(new Lwm2mDeviceProfileTransportConfiguration()); deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); - device.setDeviceProfileId(deviceProfile.getId()); } } else { - deviceProfile = new DeviceProfile(); - deviceProfile.setTenantId(tenantId); - deviceProfile.setType(DeviceProfileType.DEFAULT); - deviceProfile.setName(device.getType()); - deviceProfile.setTransportType(DeviceTransportType.LWM2M); - deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED); + findOrCreateDeviceProfileLock.lock(); + try { + deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType()); + if (deviceProfile == null) { + deviceProfile = new DeviceProfile(); + deviceProfile.setTenantId(tenantId); + deviceProfile.setType(DeviceProfileType.DEFAULT); + deviceProfile.setName(device.getType()); + deviceProfile.setTransportType(DeviceTransportType.LWM2M); + deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED); - DeviceProfileData deviceProfileData = new DeviceProfileData(); - DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration(); - DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration(); - DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null); + DeviceProfileData deviceProfileData = new DeviceProfileData(); + DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration(); + DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration(); + DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null); - deviceProfileData.setConfiguration(configuration); - deviceProfileData.setTransportConfiguration(transportConfiguration); - deviceProfileData.setProvisionConfiguration(provisionConfiguration); - deviceProfile.setProfileData(deviceProfileData); + deviceProfileData.setConfiguration(configuration); + deviceProfileData.setTransportConfiguration(transportConfiguration); + deviceProfileData.setProvisionConfiguration(provisionConfiguration); + deviceProfile.setProfileData(deviceProfileData); - deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); - device.setDeviceProfileId(deviceProfile.getId()); + deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); + } + } finally { + findOrCreateDeviceProfileLock.unlock(); + } } + return deviceProfile; } private void setValues(ObjectNode objectNode, Map data, Collection columns) { diff --git a/application/src/main/java/org/thingsboard/server/service/importing/AbstractBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/importing/AbstractBulkImportService.java index b1d0b30c9d..a1e3ff16ef 100644 --- a/application/src/main/java/org/thingsboard/server/service/importing/AbstractBulkImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/importing/AbstractBulkImportService.java @@ -22,6 +22,9 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.thingsboard.common.util.DonAsynchron; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.BaseData; import org.thingsboard.server.common.data.TenantProfile; @@ -47,11 +50,16 @@ import org.thingsboard.server.utils.CsvUtils; import org.thingsboard.server.utils.TypeCastUtil; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -67,39 +75,49 @@ public abstract class AbstractBulkImportService processBulkImport(BulkImportRequest request, SecurityUser user, Consumer> onEntityImported) throws Exception { - BulkImportResult result = new BulkImportResult<>(); + private static ThreadPoolExecutor executor; - AtomicInteger i = new AtomicInteger(0); - if (request.getMapping().getHeader()) { - i.incrementAndGet(); + @PostConstruct + private void initExecutor() { + if (executor == null) { + executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), + 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(150_000), + ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy()); + executor.allowCoreThreadTimeOut(true); } + } - parseData(request).forEach(entityData -> { - i.incrementAndGet(); - try { - ImportedEntityInfo importedEntityInfo = saveEntity(request, entityData.getFields(), user); - onEntityImported.accept(importedEntityInfo); + public final BulkImportResult processBulkImport(BulkImportRequest request, SecurityUser user, Consumer> onEntityImported) throws Exception { + List entitiesData = parseData(request); - E entity = importedEntityInfo.getEntity(); + BulkImportResult result = new BulkImportResult<>(); + CountDownLatch completionLatch = new CountDownLatch(entitiesData.size()); - saveKvs(user, entity, entityData.getKvs()); + entitiesData.forEach(entityData -> DonAsynchron.submit(() -> { + ImportedEntityInfo importedEntityInfo = saveEntity(request, entityData.getFields(), user); + E entity = importedEntityInfo.getEntity(); - if (importedEntityInfo.getRelatedError() != null) { - throw new RuntimeException(importedEntityInfo.getRelatedError()); - } + onEntityImported.accept(importedEntityInfo); + saveKvs(user, entity, entityData.getKvs()); - if (importedEntityInfo.isUpdated()) { - result.setUpdated(result.getUpdated() + 1); - } else { - result.setCreated(result.getCreated() + 1); - } - } catch (Exception e) { - result.setErrors(result.getErrors() + 1); - result.getErrorsList().add(String.format("Line %d: %s", i.get(), e.getMessage())); - } - }); + return importedEntityInfo; + }, + importedEntityInfo -> { + if (importedEntityInfo.isUpdated()) { + result.getUpdated().incrementAndGet(); + } else { + result.getCreated().incrementAndGet(); + } + completionLatch.countDown(); + }, + throwable -> { + result.getErrors().incrementAndGet(); + result.getErrorsList().add(String.format("Line %d: %s", entityData.getLineNumber(), ExceptionUtils.getRootCauseMessage(throwable))); + completionLatch.countDown(); + }, + executor)); + completionLatch.await(); return result; } @@ -186,8 +204,11 @@ public abstract class AbstractBulkImportService parseData(BulkImportRequest request) throws Exception { List> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter()); + AtomicInteger linesCounter = new AtomicInteger(0); + if (request.getMapping().getHeader()) { records.remove(0); + linesCounter.incrementAndGet(); } List columnsMappings = request.getMapping().getColumns(); @@ -205,15 +226,24 @@ public abstract class AbstractBulkImportService fields = new LinkedHashMap<>(); private final Map kvs = new LinkedHashMap<>(); + private int lineNumber; } @Data diff --git a/application/src/main/java/org/thingsboard/server/service/importing/BulkImportResult.java b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportResult.java index d6fa6ccbf9..7e937d835d 100644 --- a/application/src/main/java/org/thingsboard/server/service/importing/BulkImportResult.java +++ b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportResult.java @@ -17,14 +17,14 @@ package org.thingsboard.server.service.importing; import lombok.Data; -import java.util.LinkedList; -import java.util.List; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; @Data public class BulkImportResult { - private int created = 0; - private int updated = 0; - private int errors = 0; - private List errorsList = new LinkedList<>(); - + private AtomicInteger created = new AtomicInteger(); + private AtomicInteger updated = new AtomicInteger(); + private AtomicInteger errors = new AtomicInteger(); + private Collection errorsList = new ConcurrentLinkedDeque<>(); } diff --git a/application/src/main/java/org/thingsboard/server/service/importing/ImportedEntityInfo.java b/application/src/main/java/org/thingsboard/server/service/importing/ImportedEntityInfo.java index 958863537a..846444c1d5 100644 --- a/application/src/main/java/org/thingsboard/server/service/importing/ImportedEntityInfo.java +++ b/application/src/main/java/org/thingsboard/server/service/importing/ImportedEntityInfo.java @@ -22,5 +22,4 @@ public class ImportedEntityInfo { private E entity; private boolean isUpdated; private E oldEntity; - private String relatedError; } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java index ae994d4122..e089d7bf47 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java @@ -23,7 +23,6 @@ import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainData; @@ -71,7 +70,7 @@ public interface RuleChainService { RuleChainData exportTenantRuleChains(TenantId tenantId, PageLink pageLink) throws ThingsboardException; - List importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, RuleChainType type, boolean overwrite); + List importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, boolean overwrite); RuleChain assignRuleChainToEdge(TenantId tenantId, RuleChainId ruleChainId, EdgeId edgeId); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainImportResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainImportResult.java index 11a2f68a57..70c30de947 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainImportResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainImportResult.java @@ -15,17 +15,22 @@ */ package org.thingsboard.server.common.data.rule; -import lombok.AllArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @Data -@AllArgsConstructor public class RuleChainImportResult { + @JsonIgnore private TenantId tenantId; private RuleChainId ruleChainId; - private ComponentLifecycleEvent lifecycleEvent; + private String ruleChainName; + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private boolean updated; + @JsonInclude(JsonInclude.Include.NON_NULL) + private String error; + } diff --git a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java index 5480a3bd69..d615890507 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java +++ b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -53,4 +54,15 @@ public class DonAsynchron { Futures.addCallback(future, callback, MoreExecutors.directExecutor()); } } + + public static ListenableFuture submit(Callable task, Consumer onSuccess, Consumer onFailure, Executor executor) { + return submit(task, onSuccess, onFailure, executor, null); + } + + public static ListenableFuture submit(Callable task, Consumer onSuccess, Consumer onFailure, Executor executor, Executor callbackExecutor) { + ListenableFuture future = Futures.submit(task, executor); + withCallback(future, onSuccess, onFailure, callbackExecutor); + return future; + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsDao.java index 59ed00052e..7e07347ca4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsDao.java @@ -35,6 +35,8 @@ public interface DeviceCredentialsDao extends Dao { */ DeviceCredentials save(TenantId tenantId, DeviceCredentials deviceCredentials); + DeviceCredentials saveAndFlush(TenantId tenantId, DeviceCredentials deviceCredentials); + /** * Find device credentials by device id. * diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java index 47af5df8fe..14539066ff 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java @@ -96,7 +96,7 @@ public class DeviceCredentialsServiceImpl extends AbstractEntityService implemen log.trace("Executing updateDeviceCredentials [{}]", deviceCredentials); credentialsValidator.validate(deviceCredentials, id -> tenantId); try { - return deviceCredentialsDao.save(tenantId, deviceCredentials); + return deviceCredentialsDao.saveAndFlush(tenantId, deviceCredentials); } catch (Exception t) { ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); if (e != null && e.getConstraintName() != null diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileDao.java index 120bf0bad0..2200416e6d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileDao.java @@ -30,6 +30,8 @@ public interface DeviceProfileDao extends Dao { DeviceProfile save(TenantId tenantId, DeviceProfile deviceProfile); + DeviceProfile saveAndFlush(TenantId tenantId, DeviceProfile deviceProfile); + PageData findDeviceProfiles(TenantId tenantId, PageLink pageLink); PageData findDeviceProfileInfos(TenantId tenantId, PageLink pageLink, String transportType); diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java index 720d50ad69..105352c8a5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java @@ -167,7 +167,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D } DeviceProfile savedDeviceProfile; try { - savedDeviceProfile = deviceProfileDao.save(deviceProfile.getTenantId(), deviceProfile); + savedDeviceProfile = deviceProfileDao.saveAndFlush(deviceProfile.getTenantId(), deviceProfile); } catch (Exception t) { ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("device_profile_name_unq_key")) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 2ead3f4334..487f39f2cc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -38,7 +39,6 @@ import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.NodeConnectionInfo; @@ -59,6 +59,7 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantDao; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -416,41 +417,46 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC } @Override - public List importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, RuleChainType type, boolean overwrite) { + public List importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, boolean overwrite) { List importResults = new ArrayList<>(); + setRandomRuleChainIds(ruleChainData); resetRuleNodeIds(ruleChainData.getMetadata()); resetRuleChainMetadataTenantIds(tenantId, ruleChainData.getMetadata()); - if (overwrite) { - List persistentRuleChains = findAllTenantRuleChains(tenantId, type); - for (RuleChain ruleChain : ruleChainData.getRuleChains()) { - ComponentLifecycleEvent lifecycleEvent; - Optional persistentRuleChainOpt = persistentRuleChains.stream().filter(rc -> rc.getName().equals(ruleChain.getName())).findFirst(); - if (persistentRuleChainOpt.isPresent()) { - setNewRuleChainId(ruleChain, ruleChainData.getMetadata(), ruleChain.getId(), persistentRuleChainOpt.get().getId()); - ruleChain.setRoot(persistentRuleChainOpt.get().isRoot()); - lifecycleEvent = ComponentLifecycleEvent.UPDATED; - } else { - ruleChain.setRoot(false); - lifecycleEvent = ComponentLifecycleEvent.CREATED; + + for (RuleChain ruleChain : ruleChainData.getRuleChains()) { + RuleChainImportResult importResult = new RuleChainImportResult(); + + ruleChain.setTenantId(tenantId); + ruleChain.setRoot(false); + + if (overwrite) { + Collection existingRuleChains = ruleChainDao.findByTenantIdAndTypeAndName(tenantId, + Optional.ofNullable(ruleChain.getType()).orElse(RuleChainType.CORE), ruleChain.getName()); + Optional existingRuleChain = existingRuleChains.stream().findFirst(); + if (existingRuleChain.isPresent()) { + setNewRuleChainId(ruleChain, ruleChainData.getMetadata(), ruleChain.getId(), existingRuleChain.get().getId()); + ruleChain.setRoot(existingRuleChain.get().isRoot()); + importResult.setUpdated(true); } - ruleChain.setTenantId(tenantId); - ruleChainDao.save(tenantId, ruleChain); - importResults.add(new RuleChainImportResult(tenantId, ruleChain.getId(), lifecycleEvent)); } - } else { - if (!CollectionUtils.isEmpty(ruleChainData.getRuleChains())) { - ruleChainData.getRuleChains().forEach(rc -> { - rc.setTenantId(tenantId); - rc.setRoot(false); - RuleChain savedRc = ruleChainDao.save(tenantId, rc); - importResults.add(new RuleChainImportResult(tenantId, savedRc.getId(), ComponentLifecycleEvent.CREATED)); - }); + + try { + ruleChain = saveRuleChain(ruleChain); + } catch (Exception e) { + importResult.setError(ExceptionUtils.getRootCauseMessage(e)); } + + importResult.setTenantId(tenantId); + importResult.setRuleChainId(ruleChain.getId()); + importResult.setRuleChainName(ruleChain.getName()); + importResults.add(importResult); } - if (!CollectionUtils.isEmpty(ruleChainData.getMetadata())) { + + if (CollectionUtils.isNotEmpty(ruleChainData.getMetadata())) { ruleChainData.getMetadata().forEach(md -> saveRuleChainMetaData(tenantId, md)); } + return importResults; } @@ -475,7 +481,9 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC } if (isTenantId) { ObjectNode objNode = (ObjectNode) node; - objNode.put("id", tenantId.getId().toString()); + if (objNode.has("id")) { + objNode.put("id", tenantId.getId().toString()); + } } else { for (JsonNode jsonNode : node) { searchTenantIdRecursive(tenantId, jsonNode); @@ -723,4 +731,5 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC checkRuleNodesAndDelete(tenantId, entity.getId()); } }; + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainDao.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainDao.java index 98ad0b34a7..03fd25e944 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainDao.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.rule; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleChain; @@ -22,6 +23,7 @@ import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.TenantEntityDao; +import java.util.Collection; import java.util.UUID; /** @@ -74,4 +76,7 @@ public interface RuleChainDao extends Dao, TenantEntityDao { * @return the list of rule chain objects */ PageData findAutoAssignToEdgeRuleChainsByTenantId(UUID tenantId, PageLink pageLink); + + Collection findByTenantIdAndTypeAndName(TenantId tenantId, RuleChainType type, String name); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceCredentialsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceCredentialsRepository.java index 02ae32bd3e..97c9913239 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceCredentialsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceCredentialsRepository.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.dao.sql.device; -import org.springframework.data.repository.CrudRepository; +import org.springframework.data.jpa.repository.JpaRepository; import org.thingsboard.server.dao.model.sql.DeviceCredentialsEntity; import java.util.UUID; @@ -23,7 +23,7 @@ import java.util.UUID; /** * Created by Valerii Sosliuk on 5/6/2017. */ -public interface DeviceCredentialsRepository extends CrudRepository { +public interface DeviceCredentialsRepository extends JpaRepository { DeviceCredentialsEntity findByDeviceId(UUID deviceId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceProfileRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceProfileRepository.java index 1dc2af4ecf..311c73100c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceProfileRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceProfileRepository.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.device; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.PagingAndSortingRepository; import org.springframework.data.repository.query.Param; @@ -26,7 +27,7 @@ import org.thingsboard.server.dao.model.sql.DeviceProfileEntity; import java.util.UUID; -public interface DeviceProfileRepository extends PagingAndSortingRepository { +public interface DeviceProfileRepository extends JpaRepository { @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.image, d.defaultDashboardId, d.type, d.transportType) " + "FROM DeviceProfileEntity d " + diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java index 68eb7ab37a..26453a5e52 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql.device; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.repository.CrudRepository; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.dao.DaoUtil; @@ -46,6 +47,14 @@ public class JpaDeviceCredentialsDao extends JpaAbstractDao findDeviceProfiles(TenantId tenantId, PageLink pageLink) { return DaoUtil.toPageData( diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleChainDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleChainDao.java index 040e60daad..483ab39f06 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleChainDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleChainDao.java @@ -29,6 +29,7 @@ import org.thingsboard.server.dao.model.sql.RuleChainEntity; import org.thingsboard.server.dao.rule.RuleChainDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; +import java.util.Collection; import java.util.Objects; import java.util.UUID; @@ -97,8 +98,14 @@ public class JpaRuleChainDao extends JpaAbstractSearchTextDao findByTenantIdAndTypeAndName(TenantId tenantId, RuleChainType type, String name) { + return DaoUtil.convertDataList(ruleChainRepository.findByTenantIdAndTypeAndName(tenantId.getId(), type, name)); + } + @Override public Long countByTenantId(TenantId tenantId) { return ruleChainRepository.countByTenantId(tenantId.getId()); } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleChainRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleChainRepository.java index 86a147cc57..fda0a0d7f0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleChainRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleChainRepository.java @@ -23,6 +23,7 @@ import org.springframework.data.repository.query.Param; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.dao.model.sql.RuleChainEntity; +import java.util.List; import java.util.UUID; public interface RuleChainRepository extends PagingAndSortingRepository { @@ -55,10 +56,13 @@ public interface RuleChainRepository extends PagingAndSortingRepository findAutoAssignByTenantId(@Param("tenantId") UUID tenantId, - @Param("searchText") String searchText, - Pageable pageable); + @Param("searchText") String searchText, + Pageable pageable); RuleChainEntity findByTenantIdAndTypeAndRootIsTrue(UUID tenantId, RuleChainType ruleChainType); Long countByTenantId(UUID tenantId); + + List findByTenantIdAndTypeAndName(UUID tenantId, RuleChainType type, String name); + }