Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Igor Kulikov 2021-10-19 22:22:19 +03:00
commit 37a71a57c6
23 changed files with 223 additions and 112 deletions

View File

@ -133,7 +133,7 @@ public class AssetController extends BaseController {
Asset savedAsset = checkNotNull(assetService.saveAsset(asset)); Asset savedAsset = checkNotNull(assetService.saveAsset(asset));
onAssetCreatedOrUpdated(savedAsset, asset.getId() != null); onAssetCreatedOrUpdated(savedAsset, asset.getId() != null, getCurrentUser());
return savedAsset; return savedAsset;
} catch (Exception e) { } catch (Exception e) {
@ -143,9 +143,9 @@ public class AssetController extends BaseController {
} }
} }
private void onAssetCreatedOrUpdated(Asset asset, boolean updated) { private void onAssetCreatedOrUpdated(Asset asset, boolean updated, SecurityUser user) {
try { try {
logEntityAction(asset.getId(), asset, logEntityAction(user, asset.getId(), asset,
asset.getCustomerId(), asset.getCustomerId(),
updated ? ActionType.UPDATED : ActionType.ADDED, null); updated ? ActionType.UPDATED : ActionType.ADDED, null);
} catch (ThingsboardException e) { } catch (ThingsboardException e) {
@ -648,8 +648,9 @@ public class AssetController extends BaseController {
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@PostMapping("/asset/bulk_import") @PostMapping("/asset/bulk_import")
public BulkImportResult<Asset> processAssetsBulkImport(@RequestBody BulkImportRequest request) throws Exception { public BulkImportResult<Asset> processAssetsBulkImport(@RequestBody BulkImportRequest request) throws Exception {
return assetBulkImportService.processBulkImport(request, getCurrentUser(), importedAssetInfo -> { SecurityUser user = getCurrentUser();
onAssetCreatedOrUpdated(importedAssetInfo.getEntity(), importedAssetInfo.isUpdated()); return assetBulkImportService.processBulkImport(request, user, importedAssetInfo -> {
onAssetCreatedOrUpdated(importedAssetInfo.getEntity(), importedAssetInfo.isUpdated(), user);
}); });
} }

View File

@ -161,7 +161,7 @@ public class DeviceController extends BaseController {
Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken)); Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken));
onDeviceCreatedOrUpdated(savedDevice, oldDevice, !created); onDeviceCreatedOrUpdated(savedDevice, oldDevice, !created, getCurrentUser());
return savedDevice; return savedDevice;
} catch (Exception e) { } catch (Exception e) {
@ -172,11 +172,11 @@ public class DeviceController extends BaseController {
} }
private void onDeviceCreatedOrUpdated(Device savedDevice, Device oldDevice, boolean updated) { private void onDeviceCreatedOrUpdated(Device savedDevice, Device oldDevice, boolean updated, SecurityUser user) {
tbClusterService.onDeviceUpdated(savedDevice, oldDevice); tbClusterService.onDeviceUpdated(savedDevice, oldDevice);
try { try {
logEntityAction(savedDevice.getId(), savedDevice, logEntityAction(user, savedDevice.getId(), savedDevice,
savedDevice.getCustomerId(), savedDevice.getCustomerId(),
updated ? ActionType.UPDATED : ActionType.ADDED, null); updated ? ActionType.UPDATED : ActionType.ADDED, null);
} catch (ThingsboardException e) { } catch (ThingsboardException e) {
@ -941,8 +941,9 @@ public class DeviceController extends BaseController {
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@PostMapping("/device/bulk_import") @PostMapping("/device/bulk_import")
public BulkImportResult<Device> processDevicesBulkImport(@RequestBody BulkImportRequest request) throws Exception { public BulkImportResult<Device> processDevicesBulkImport(@RequestBody BulkImportRequest request) throws Exception {
return deviceBulkImportService.processBulkImport(request, getCurrentUser(), importedDeviceInfo -> { SecurityUser user = getCurrentUser();
onDeviceCreatedOrUpdated(importedDeviceInfo.getEntity(), importedDeviceInfo.getOldEntity(), importedDeviceInfo.isUpdated()); return deviceBulkImportService.processBulkImport(request, user, importedDeviceInfo -> {
onDeviceCreatedOrUpdated(importedDeviceInfo.getEntity(), importedDeviceInfo.getOldEntity(), importedDeviceInfo.isUpdated(), user);
}); });
} }

View File

@ -140,7 +140,7 @@ public class EdgeController extends BaseController {
edge.getId(), edge); edge.getId(), edge);
Edge savedEdge = checkNotNull(edgeService.saveEdge(edge, true)); Edge savedEdge = checkNotNull(edgeService.saveEdge(edge, true));
onEdgeCreatedOrUpdated(tenantId, savedEdge, edgeTemplateRootRuleChain, !created); onEdgeCreatedOrUpdated(tenantId, savedEdge, edgeTemplateRootRuleChain, !created, getCurrentUser());
return savedEdge; return savedEdge;
} catch (Exception e) { } catch (Exception e) {
@ -150,7 +150,7 @@ public class EdgeController extends BaseController {
} }
} }
private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated) throws IOException, ThingsboardException { private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated, SecurityUser user) throws IOException, ThingsboardException {
if (!updated) { if (!updated) {
ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId()); ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId());
edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId()); edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId());
@ -160,7 +160,7 @@ public class EdgeController extends BaseController {
tbClusterService.broadcastEntityStateChangeEvent(edge.getTenantId(), edge.getId(), tbClusterService.broadcastEntityStateChangeEvent(edge.getTenantId(), edge.getId(),
updated ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED); updated ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED);
logEntityAction(edge.getId(), edge, null, updated ? ActionType.UPDATED : ActionType.ADDED, null); logEntityAction(user, edge.getId(), edge, null, updated ? ActionType.UPDATED : ActionType.ADDED, null);
} }
@PreAuthorize("hasAuthority('TENANT_ADMIN')") @PreAuthorize("hasAuthority('TENANT_ADMIN')")
@ -586,7 +586,7 @@ public class EdgeController extends BaseController {
return edgeBulkImportService.processBulkImport(request, user, importedAssetInfo -> { return edgeBulkImportService.processBulkImport(request, user, importedAssetInfo -> {
try { try {
onEdgeCreatedOrUpdated(user.getTenantId(), importedAssetInfo.getEntity(), edgeTemplateRootRuleChain, importedAssetInfo.isUpdated()); onEdgeCreatedOrUpdated(user.getTenantId(), importedAssetInfo.getEntity(), edgeTemplateRootRuleChain, importedAssetInfo.isUpdated(), user);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -25,7 +25,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
@ -450,15 +449,17 @@ public class RuleChainController extends BaseController {
@PreAuthorize("hasAuthority('TENANT_ADMIN')") @PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/ruleChains/import", method = RequestMethod.POST) @RequestMapping(value = "/ruleChains/import", method = RequestMethod.POST)
@ResponseBody @ResponseBody
public void importRuleChains(@RequestBody RuleChainData ruleChainData, @RequestParam(required = false, defaultValue = "false") boolean overwrite) throws ThingsboardException { public List<RuleChainImportResult> importRuleChains(@RequestBody RuleChainData ruleChainData, @RequestParam(required = false, defaultValue = "false") boolean overwrite) throws ThingsboardException {
try { try {
TenantId tenantId = getCurrentUser().getTenantId(); TenantId tenantId = getCurrentUser().getTenantId();
List<RuleChainImportResult> importResults = ruleChainService.importTenantRuleChains(tenantId, ruleChainData, RuleChainType.CORE, overwrite); List<RuleChainImportResult> importResults = ruleChainService.importTenantRuleChains(tenantId, ruleChainData, overwrite);
if (!CollectionUtils.isEmpty(importResults)) { for (RuleChainImportResult importResult : importResults) {
for (RuleChainImportResult importResult : importResults) { if (importResult.getError() == null) {
tbClusterService.broadcastEntityStateChangeEvent(importResult.getTenantId(), importResult.getRuleChainId(), importResult.getLifecycleEvent()); tbClusterService.broadcastEntityStateChangeEvent(importResult.getTenantId(), importResult.getRuleChainId(),
importResult.isUpdated() ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED);
} }
} }
return importResults;
} catch (Exception e) { } catch (Exception e) {
throw handleException(e); throw handleException(e);
} }

View File

@ -63,6 +63,8 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Service @Service
@TbCoreComponent @TbCoreComponent
@ -71,6 +73,8 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> {
protected final DeviceCredentialsService deviceCredentialsService; protected final DeviceCredentialsService deviceCredentialsService;
protected final DeviceProfileService deviceProfileService; protected final DeviceProfileService deviceProfileService;
private final Lock findOrCreateDeviceProfileLock = new ReentrantLock();
public DeviceBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, public DeviceBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache,
AccessControlService accessControlService, AccessValidator accessValidator, AccessControlService accessControlService, AccessValidator accessValidator,
EntityActionService entityActionService, TbClusterService clusterService, EntityActionService entityActionService, TbClusterService clusterService,
@ -106,9 +110,13 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> {
throw new DeviceCredentialsValidationException("Invalid device credentials: " + e.getMessage()); throw new DeviceCredentialsValidationException("Invalid device credentials: " + e.getMessage());
} }
DeviceProfile deviceProfile;
if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
setUpLwM2mDeviceProfile(user.getTenantId(), device); deviceProfile = setUpLwM2mDeviceProfile(user.getTenantId(), device);
} else {
deviceProfile = deviceProfileService.findOrCreateDeviceProfile(user.getTenantId(), device.getType());
} }
device.setDeviceProfileId(deviceProfile.getId());
device = deviceService.saveDeviceWithCredentials(device, deviceCredentials); device = deviceService.saveDeviceWithCredentials(device, deviceCredentials);
@ -215,36 +223,43 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> {
credentials.setCredentialsValue(lwm2mCredentials.toString()); credentials.setCredentialsValue(lwm2mCredentials.toString());
} }
private void setUpLwM2mDeviceProfile(TenantId tenantId, Device device) { private DeviceProfile setUpLwM2mDeviceProfile(TenantId tenantId, Device device) {
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType()); DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType());
if (deviceProfile != null) { if (deviceProfile != null) {
if (deviceProfile.getTransportType() != DeviceTransportType.LWM2M) { if (deviceProfile.getTransportType() != DeviceTransportType.LWM2M) {
deviceProfile.setTransportType(DeviceTransportType.LWM2M); deviceProfile.setTransportType(DeviceTransportType.LWM2M);
deviceProfile.getProfileData().setTransportConfiguration(new Lwm2mDeviceProfileTransportConfiguration()); deviceProfile.getProfileData().setTransportConfiguration(new Lwm2mDeviceProfileTransportConfiguration());
deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);
device.setDeviceProfileId(deviceProfile.getId());
} }
} else { } else {
deviceProfile = new DeviceProfile(); findOrCreateDeviceProfileLock.lock();
deviceProfile.setTenantId(tenantId); try {
deviceProfile.setType(DeviceProfileType.DEFAULT); deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType());
deviceProfile.setName(device.getType()); if (deviceProfile == null) {
deviceProfile.setTransportType(DeviceTransportType.LWM2M); deviceProfile = new DeviceProfile();
deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED); deviceProfile.setTenantId(tenantId);
deviceProfile.setType(DeviceProfileType.DEFAULT);
deviceProfile.setName(device.getType());
deviceProfile.setTransportType(DeviceTransportType.LWM2M);
deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED);
DeviceProfileData deviceProfileData = new DeviceProfileData(); DeviceProfileData deviceProfileData = new DeviceProfileData();
DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration(); DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration();
DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration(); DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration();
DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null); DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null);
deviceProfileData.setConfiguration(configuration); deviceProfileData.setConfiguration(configuration);
deviceProfileData.setTransportConfiguration(transportConfiguration); deviceProfileData.setTransportConfiguration(transportConfiguration);
deviceProfileData.setProvisionConfiguration(provisionConfiguration); deviceProfileData.setProvisionConfiguration(provisionConfiguration);
deviceProfile.setProfileData(deviceProfileData); deviceProfile.setProfileData(deviceProfileData);
deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);
device.setDeviceProfileId(deviceProfile.getId()); }
} finally {
findOrCreateDeviceProfileLock.unlock();
}
} }
return deviceProfile;
} }
private void setValues(ObjectNode objectNode, Map<BulkImportColumnType, String> data, Collection<BulkImportColumnType> columns) { private void setValues(ObjectNode objectNode, Map<BulkImportColumnType, String> data, Collection<BulkImportColumnType> columns) {

View File

@ -22,6 +22,9 @@ import lombok.Data;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.BaseData; import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.TenantProfile;
@ -47,11 +50,16 @@ import org.thingsboard.server.utils.CsvUtils;
import org.thingsboard.server.utils.TypeCastUtil; import org.thingsboard.server.utils.TypeCastUtil;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -67,39 +75,49 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
protected final EntityActionService entityActionService; protected final EntityActionService entityActionService;
protected final TbClusterService clusterService; protected final TbClusterService clusterService;
public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user, Consumer<ImportedEntityInfo<E>> onEntityImported) throws Exception { private static ThreadPoolExecutor executor;
BulkImportResult<E> result = new BulkImportResult<>();
AtomicInteger i = new AtomicInteger(0); @PostConstruct
if (request.getMapping().getHeader()) { private void initExecutor() {
i.incrementAndGet(); if (executor == null) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(150_000),
ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
} }
}
parseData(request).forEach(entityData -> { public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user, Consumer<ImportedEntityInfo<E>> onEntityImported) throws Exception {
i.incrementAndGet(); List<EntityData> entitiesData = parseData(request);
try {
ImportedEntityInfo<E> importedEntityInfo = saveEntity(request, entityData.getFields(), user);
onEntityImported.accept(importedEntityInfo);
E entity = importedEntityInfo.getEntity(); BulkImportResult<E> result = new BulkImportResult<>();
CountDownLatch completionLatch = new CountDownLatch(entitiesData.size());
saveKvs(user, entity, entityData.getKvs()); entitiesData.forEach(entityData -> DonAsynchron.submit(() -> {
ImportedEntityInfo<E> importedEntityInfo = saveEntity(request, entityData.getFields(), user);
E entity = importedEntityInfo.getEntity();
if (importedEntityInfo.getRelatedError() != null) { onEntityImported.accept(importedEntityInfo);
throw new RuntimeException(importedEntityInfo.getRelatedError()); saveKvs(user, entity, entityData.getKvs());
}
if (importedEntityInfo.isUpdated()) { return importedEntityInfo;
result.setUpdated(result.getUpdated() + 1); },
} else { importedEntityInfo -> {
result.setCreated(result.getCreated() + 1); if (importedEntityInfo.isUpdated()) {
} result.getUpdated().incrementAndGet();
} catch (Exception e) { } else {
result.setErrors(result.getErrors() + 1); result.getCreated().incrementAndGet();
result.getErrorsList().add(String.format("Line %d: %s", i.get(), e.getMessage())); }
} completionLatch.countDown();
}); },
throwable -> {
result.getErrors().incrementAndGet();
result.getErrorsList().add(String.format("Line %d: %s", entityData.getLineNumber(), ExceptionUtils.getRootCauseMessage(throwable)));
completionLatch.countDown();
},
executor));
completionLatch.await();
return result; return result;
} }
@ -186,8 +204,11 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
private List<EntityData> parseData(BulkImportRequest request) throws Exception { private List<EntityData> parseData(BulkImportRequest request) throws Exception {
List<List<String>> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter()); List<List<String>> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter());
AtomicInteger linesCounter = new AtomicInteger(0);
if (request.getMapping().getHeader()) { if (request.getMapping().getHeader()) {
records.remove(0); records.remove(0);
linesCounter.incrementAndGet();
} }
List<ColumnMapping> columnsMappings = request.getMapping().getColumns(); List<ColumnMapping> columnsMappings = request.getMapping().getColumns();
@ -205,15 +226,24 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
entityData.getKvs().put(entry.getKey(), new ParsedValue(castResult.getValue(), castResult.getKey())); entityData.getKvs().put(entry.getKey(), new ParsedValue(castResult.getValue(), castResult.getKey()));
} }
}); });
entityData.setLineNumber(linesCounter.incrementAndGet());
return entityData; return entityData;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@PreDestroy
private void shutdownExecutor() {
if (!executor.isTerminating()) {
executor.shutdown();
}
}
@Data @Data
protected static class EntityData { protected static class EntityData {
private final Map<BulkImportColumnType, String> fields = new LinkedHashMap<>(); private final Map<BulkImportColumnType, String> fields = new LinkedHashMap<>();
private final Map<ColumnMapping, ParsedValue> kvs = new LinkedHashMap<>(); private final Map<ColumnMapping, ParsedValue> kvs = new LinkedHashMap<>();
private int lineNumber;
} }
@Data @Data

View File

@ -17,14 +17,14 @@ package org.thingsboard.server.service.importing;
import lombok.Data; import lombok.Data;
import java.util.LinkedList; import java.util.Collection;
import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
@Data @Data
public class BulkImportResult<E> { public class BulkImportResult<E> {
private int created = 0; private AtomicInteger created = new AtomicInteger();
private int updated = 0; private AtomicInteger updated = new AtomicInteger();
private int errors = 0; private AtomicInteger errors = new AtomicInteger();
private List<String> errorsList = new LinkedList<>(); private Collection<String> errorsList = new ConcurrentLinkedDeque<>();
} }

View File

@ -22,5 +22,4 @@ public class ImportedEntityInfo<E> {
private E entity; private E entity;
private boolean isUpdated; private boolean isUpdated;
private E oldEntity; private E oldEntity;
private String relatedError;
} }

View File

@ -23,7 +23,6 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainData; import org.thingsboard.server.common.data.rule.RuleChainData;
@ -71,7 +70,7 @@ public interface RuleChainService {
RuleChainData exportTenantRuleChains(TenantId tenantId, PageLink pageLink) throws ThingsboardException; RuleChainData exportTenantRuleChains(TenantId tenantId, PageLink pageLink) throws ThingsboardException;
List<RuleChainImportResult> importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, RuleChainType type, boolean overwrite); List<RuleChainImportResult> importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, boolean overwrite);
RuleChain assignRuleChainToEdge(TenantId tenantId, RuleChainId ruleChainId, EdgeId edgeId); RuleChain assignRuleChainToEdge(TenantId tenantId, RuleChainId ruleChainId, EdgeId edgeId);

View File

@ -15,17 +15,22 @@
*/ */
package org.thingsboard.server.common.data.rule; package org.thingsboard.server.common.data.rule;
import lombok.AllArgsConstructor; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
@Data @Data
@AllArgsConstructor
public class RuleChainImportResult { public class RuleChainImportResult {
@JsonIgnore
private TenantId tenantId; private TenantId tenantId;
private RuleChainId ruleChainId; private RuleChainId ruleChainId;
private ComponentLifecycleEvent lifecycleEvent; private String ruleChainName;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
private boolean updated;
@JsonInclude(JsonInclude.Include.NON_NULL)
private String error;
} }

View File

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -53,4 +54,15 @@ public class DonAsynchron {
Futures.addCallback(future, callback, MoreExecutors.directExecutor()); Futures.addCallback(future, callback, MoreExecutors.directExecutor());
} }
} }
public static <T> ListenableFuture<T> submit(Callable<T> task, Consumer<T> onSuccess, Consumer<Throwable> onFailure, Executor executor) {
return submit(task, onSuccess, onFailure, executor, null);
}
public static <T> ListenableFuture<T> submit(Callable<T> task, Consumer<T> onSuccess, Consumer<Throwable> onFailure, Executor executor, Executor callbackExecutor) {
ListenableFuture<T> future = Futures.submit(task, executor);
withCallback(future, onSuccess, onFailure, callbackExecutor);
return future;
}
} }

View File

@ -35,6 +35,8 @@ public interface DeviceCredentialsDao extends Dao<DeviceCredentials> {
*/ */
DeviceCredentials save(TenantId tenantId, DeviceCredentials deviceCredentials); DeviceCredentials save(TenantId tenantId, DeviceCredentials deviceCredentials);
DeviceCredentials saveAndFlush(TenantId tenantId, DeviceCredentials deviceCredentials);
/** /**
* Find device credentials by device id. * Find device credentials by device id.
* *

View File

@ -96,7 +96,7 @@ public class DeviceCredentialsServiceImpl extends AbstractEntityService implemen
log.trace("Executing updateDeviceCredentials [{}]", deviceCredentials); log.trace("Executing updateDeviceCredentials [{}]", deviceCredentials);
credentialsValidator.validate(deviceCredentials, id -> tenantId); credentialsValidator.validate(deviceCredentials, id -> tenantId);
try { try {
return deviceCredentialsDao.save(tenantId, deviceCredentials); return deviceCredentialsDao.saveAndFlush(tenantId, deviceCredentials);
} catch (Exception t) { } catch (Exception t) {
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
if (e != null && e.getConstraintName() != null if (e != null && e.getConstraintName() != null

View File

@ -30,6 +30,8 @@ public interface DeviceProfileDao extends Dao<DeviceProfile> {
DeviceProfile save(TenantId tenantId, DeviceProfile deviceProfile); DeviceProfile save(TenantId tenantId, DeviceProfile deviceProfile);
DeviceProfile saveAndFlush(TenantId tenantId, DeviceProfile deviceProfile);
PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink); PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink);
PageData<DeviceProfileInfo> findDeviceProfileInfos(TenantId tenantId, PageLink pageLink, String transportType); PageData<DeviceProfileInfo> findDeviceProfileInfos(TenantId tenantId, PageLink pageLink, String transportType);

View File

@ -167,7 +167,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
} }
DeviceProfile savedDeviceProfile; DeviceProfile savedDeviceProfile;
try { try {
savedDeviceProfile = deviceProfileDao.save(deviceProfile.getTenantId(), deviceProfile); savedDeviceProfile = deviceProfileDao.saveAndFlush(deviceProfile.getTenantId(), deviceProfile);
} catch (Exception t) { } catch (Exception t) {
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("device_profile_name_unq_key")) { if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("device_profile_name_unq_key")) {

View File

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.hibernate.exception.ConstraintViolationException; import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
@ -38,7 +39,6 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
@ -59,6 +59,7 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantDao; import org.thingsboard.server.dao.tenant.TenantDao;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -416,41 +417,46 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
} }
@Override @Override
public List<RuleChainImportResult> importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, RuleChainType type, boolean overwrite) { public List<RuleChainImportResult> importTenantRuleChains(TenantId tenantId, RuleChainData ruleChainData, boolean overwrite) {
List<RuleChainImportResult> importResults = new ArrayList<>(); List<RuleChainImportResult> importResults = new ArrayList<>();
setRandomRuleChainIds(ruleChainData); setRandomRuleChainIds(ruleChainData);
resetRuleNodeIds(ruleChainData.getMetadata()); resetRuleNodeIds(ruleChainData.getMetadata());
resetRuleChainMetadataTenantIds(tenantId, ruleChainData.getMetadata()); resetRuleChainMetadataTenantIds(tenantId, ruleChainData.getMetadata());
if (overwrite) {
List<RuleChain> persistentRuleChains = findAllTenantRuleChains(tenantId, type); for (RuleChain ruleChain : ruleChainData.getRuleChains()) {
for (RuleChain ruleChain : ruleChainData.getRuleChains()) { RuleChainImportResult importResult = new RuleChainImportResult();
ComponentLifecycleEvent lifecycleEvent;
Optional<RuleChain> persistentRuleChainOpt = persistentRuleChains.stream().filter(rc -> rc.getName().equals(ruleChain.getName())).findFirst(); ruleChain.setTenantId(tenantId);
if (persistentRuleChainOpt.isPresent()) { ruleChain.setRoot(false);
setNewRuleChainId(ruleChain, ruleChainData.getMetadata(), ruleChain.getId(), persistentRuleChainOpt.get().getId());
ruleChain.setRoot(persistentRuleChainOpt.get().isRoot()); if (overwrite) {
lifecycleEvent = ComponentLifecycleEvent.UPDATED; Collection<RuleChain> existingRuleChains = ruleChainDao.findByTenantIdAndTypeAndName(tenantId,
} else { Optional.ofNullable(ruleChain.getType()).orElse(RuleChainType.CORE), ruleChain.getName());
ruleChain.setRoot(false); Optional<RuleChain> existingRuleChain = existingRuleChains.stream().findFirst();
lifecycleEvent = ComponentLifecycleEvent.CREATED; if (existingRuleChain.isPresent()) {
setNewRuleChainId(ruleChain, ruleChainData.getMetadata(), ruleChain.getId(), existingRuleChain.get().getId());
ruleChain.setRoot(existingRuleChain.get().isRoot());
importResult.setUpdated(true);
} }
ruleChain.setTenantId(tenantId);
ruleChainDao.save(tenantId, ruleChain);
importResults.add(new RuleChainImportResult(tenantId, ruleChain.getId(), lifecycleEvent));
} }
} else {
if (!CollectionUtils.isEmpty(ruleChainData.getRuleChains())) { try {
ruleChainData.getRuleChains().forEach(rc -> { ruleChain = saveRuleChain(ruleChain);
rc.setTenantId(tenantId); } catch (Exception e) {
rc.setRoot(false); importResult.setError(ExceptionUtils.getRootCauseMessage(e));
RuleChain savedRc = ruleChainDao.save(tenantId, rc);
importResults.add(new RuleChainImportResult(tenantId, savedRc.getId(), ComponentLifecycleEvent.CREATED));
});
} }
importResult.setTenantId(tenantId);
importResult.setRuleChainId(ruleChain.getId());
importResult.setRuleChainName(ruleChain.getName());
importResults.add(importResult);
} }
if (!CollectionUtils.isEmpty(ruleChainData.getMetadata())) {
if (CollectionUtils.isNotEmpty(ruleChainData.getMetadata())) {
ruleChainData.getMetadata().forEach(md -> saveRuleChainMetaData(tenantId, md)); ruleChainData.getMetadata().forEach(md -> saveRuleChainMetaData(tenantId, md));
} }
return importResults; return importResults;
} }
@ -475,7 +481,9 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
} }
if (isTenantId) { if (isTenantId) {
ObjectNode objNode = (ObjectNode) node; ObjectNode objNode = (ObjectNode) node;
objNode.put("id", tenantId.getId().toString()); if (objNode.has("id")) {
objNode.put("id", tenantId.getId().toString());
}
} else { } else {
for (JsonNode jsonNode : node) { for (JsonNode jsonNode : node) {
searchTenantIdRecursive(tenantId, jsonNode); searchTenantIdRecursive(tenantId, jsonNode);
@ -723,4 +731,5 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
checkRuleNodesAndDelete(tenantId, entity.getId()); checkRuleNodesAndDelete(tenantId, entity.getId());
} }
}; };
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.dao.rule; package org.thingsboard.server.dao.rule;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChain;
@ -22,6 +23,7 @@ import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.Dao;
import org.thingsboard.server.dao.TenantEntityDao; import org.thingsboard.server.dao.TenantEntityDao;
import java.util.Collection;
import java.util.UUID; import java.util.UUID;
/** /**
@ -74,4 +76,7 @@ public interface RuleChainDao extends Dao<RuleChain>, TenantEntityDao {
* @return the list of rule chain objects * @return the list of rule chain objects
*/ */
PageData<RuleChain> findAutoAssignToEdgeRuleChainsByTenantId(UUID tenantId, PageLink pageLink); PageData<RuleChain> findAutoAssignToEdgeRuleChainsByTenantId(UUID tenantId, PageLink pageLink);
Collection<RuleChain> findByTenantIdAndTypeAndName(TenantId tenantId, RuleChainType type, String name);
} }

View File

@ -15,7 +15,7 @@
*/ */
package org.thingsboard.server.dao.sql.device; package org.thingsboard.server.dao.sql.device;
import org.springframework.data.repository.CrudRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.thingsboard.server.dao.model.sql.DeviceCredentialsEntity; import org.thingsboard.server.dao.model.sql.DeviceCredentialsEntity;
import java.util.UUID; import java.util.UUID;
@ -23,7 +23,7 @@ import java.util.UUID;
/** /**
* Created by Valerii Sosliuk on 5/6/2017. * Created by Valerii Sosliuk on 5/6/2017.
*/ */
public interface DeviceCredentialsRepository extends CrudRepository<DeviceCredentialsEntity, UUID> { public interface DeviceCredentialsRepository extends JpaRepository<DeviceCredentialsEntity, UUID> {
DeviceCredentialsEntity findByDeviceId(UUID deviceId); DeviceCredentialsEntity findByDeviceId(UUID deviceId);

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.device;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.PagingAndSortingRepository; import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.query.Param; import org.springframework.data.repository.query.Param;
@ -26,7 +27,7 @@ import org.thingsboard.server.dao.model.sql.DeviceProfileEntity;
import java.util.UUID; import java.util.UUID;
public interface DeviceProfileRepository extends PagingAndSortingRepository<DeviceProfileEntity, UUID> { public interface DeviceProfileRepository extends JpaRepository<DeviceProfileEntity, UUID> {
@Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.image, d.defaultDashboardId, d.type, d.transportType) " + @Query("SELECT new org.thingsboard.server.common.data.DeviceProfileInfo(d.id, d.name, d.image, d.defaultDashboardId, d.type, d.transportType) " +
"FROM DeviceProfileEntity d " + "FROM DeviceProfileEntity d " +

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql.device;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.DaoUtil;
@ -46,6 +47,14 @@ public class JpaDeviceCredentialsDao extends JpaAbstractDao<DeviceCredentialsEnt
return deviceCredentialsRepository; return deviceCredentialsRepository;
} }
@Transactional
@Override
public DeviceCredentials saveAndFlush(TenantId tenantId, DeviceCredentials deviceCredentials) {
DeviceCredentials result = save(tenantId, deviceCredentials);
deviceCredentialsRepository.flush();
return result;
}
@Override @Override
public DeviceCredentials findByDeviceId(TenantId tenantId, UUID deviceId) { public DeviceCredentials findByDeviceId(TenantId tenantId, UUID deviceId) {
return DaoUtil.getData(deviceCredentialsRepository.findByDeviceId(deviceId)); return DaoUtil.getData(deviceCredentialsRepository.findByDeviceId(deviceId));

View File

@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileInfo; import org.thingsboard.server.common.data.DeviceProfileInfo;
import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.DeviceTransportType;
@ -54,6 +55,14 @@ public class JpaDeviceProfileDao extends JpaAbstractSearchTextDao<DeviceProfileE
return deviceProfileRepository.findDeviceProfileInfoById(deviceProfileId); return deviceProfileRepository.findDeviceProfileInfoById(deviceProfileId);
} }
@Transactional
@Override
public DeviceProfile saveAndFlush(TenantId tenantId, DeviceProfile deviceProfile) {
DeviceProfile result = save(tenantId, deviceProfile);
deviceProfileRepository.flush();
return result;
}
@Override @Override
public PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink) { public PageData<DeviceProfile> findDeviceProfiles(TenantId tenantId, PageLink pageLink) {
return DaoUtil.toPageData( return DaoUtil.toPageData(

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.dao.model.sql.RuleChainEntity;
import org.thingsboard.server.dao.rule.RuleChainDao; import org.thingsboard.server.dao.rule.RuleChainDao;
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
import java.util.Collection;
import java.util.Objects; import java.util.Objects;
import java.util.UUID; import java.util.UUID;
@ -97,8 +98,14 @@ public class JpaRuleChainDao extends JpaAbstractSearchTextDao<RuleChainEntity, R
DaoUtil.toPageable(pageLink))); DaoUtil.toPageable(pageLink)));
} }
@Override
public Collection<RuleChain> findByTenantIdAndTypeAndName(TenantId tenantId, RuleChainType type, String name) {
return DaoUtil.convertDataList(ruleChainRepository.findByTenantIdAndTypeAndName(tenantId.getId(), type, name));
}
@Override @Override
public Long countByTenantId(TenantId tenantId) { public Long countByTenantId(TenantId tenantId) {
return ruleChainRepository.countByTenantId(tenantId.getId()); return ruleChainRepository.countByTenantId(tenantId.getId());
} }
} }

View File

@ -23,6 +23,7 @@ import org.springframework.data.repository.query.Param;
import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.dao.model.sql.RuleChainEntity; import org.thingsboard.server.dao.model.sql.RuleChainEntity;
import java.util.List;
import java.util.UUID; import java.util.UUID;
public interface RuleChainRepository extends PagingAndSortingRepository<RuleChainEntity, UUID> { public interface RuleChainRepository extends PagingAndSortingRepository<RuleChainEntity, UUID> {
@ -55,10 +56,13 @@ public interface RuleChainRepository extends PagingAndSortingRepository<RuleChai
"AND re.relationType = 'Contains' AND re.fromId = :tenantId AND re.fromType = 'TENANT' " + "AND re.relationType = 'Contains' AND re.fromId = :tenantId AND re.fromType = 'TENANT' " +
"AND LOWER(rc.searchText) LIKE LOWER(CONCAT(:searchText, '%'))") "AND LOWER(rc.searchText) LIKE LOWER(CONCAT(:searchText, '%'))")
Page<RuleChainEntity> findAutoAssignByTenantId(@Param("tenantId") UUID tenantId, Page<RuleChainEntity> findAutoAssignByTenantId(@Param("tenantId") UUID tenantId,
@Param("searchText") String searchText, @Param("searchText") String searchText,
Pageable pageable); Pageable pageable);
RuleChainEntity findByTenantIdAndTypeAndRootIsTrue(UUID tenantId, RuleChainType ruleChainType); RuleChainEntity findByTenantIdAndTypeAndRootIsTrue(UUID tenantId, RuleChainType ruleChainType);
Long countByTenantId(UUID tenantId); Long countByTenantId(UUID tenantId);
List<RuleChainEntity> findByTenantIdAndTypeAndName(UUID tenantId, RuleChainType type, String name);
} }