diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 6fcb02e4bc..e49f22ed7e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -139,8 +139,8 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { UPDATED_COMMENT, DELETED -> true; default -> switch (type) { case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, - WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, NOTIFICATION_TEMPLATE, NOTIFICATION_TARGET, - NOTIFICATION_RULE -> true; + WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, CALCULATED_FIELD, NOTIFICATION_TEMPLATE, + NOTIFICATION_TARGET, NOTIFICATION_RULE -> true; default -> false; }; }; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java index 503e5d74dc..a7781fe55d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java @@ -80,7 +80,7 @@ public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor { calculatedField.setId(calculatedFieldId); } - edgeCtx.getCalculatedFieldService().save(calculatedField); + edgeCtx.getCalculatedFieldService().save(calculatedField, false); } catch (Exception e) { log.error("[{}] Failed to process calculatedField update msg [{}]", tenantId, calculatedFieldUpdateMsg, e); throw e; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 4487a0acf8..414dcdab76 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; -import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -32,7 +31,6 @@ import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; -import org.thingsboard.server.common.data.ExportableEntity; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -48,8 +46,6 @@ import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelationsQuery; import org.thingsboard.server.common.data.relation.EntitySearchDirection; @@ -82,15 +78,10 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.state.DefaultDeviceStateService; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.UUID; -import java.util.function.Function; import java.util.stream.Collectors; @Service @@ -328,79 +319,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EntityType.valueOf(calculatedFieldRequestMsg.getEntityType()), new UUID(calculatedFieldRequestMsg.getEntityIdMSB(), calculatedFieldRequestMsg.getEntityIdLSB())); - if (entityId.getEntityType() == EntityType.EDGE) { - log.trace("[{}] processAllCalculatedField [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg); - return syncAllCalculatedFieldForEdge(tenantId, edge, calculatedFieldRequestMsg); - } else { - log.trace("[{}] processCalculatedField [{}][{}] for entity [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg, entityId.getEntityType(), entityId.getId()); - return saveCalculatedFieldsToEdge(tenantId, edge.getId(), entityId); - } - } - - @NotNull - private ListenableFuture syncAllCalculatedFieldForEdge(TenantId tenantId, Edge edge, CalculatedFieldRequestMsg calculatedFieldRequestMsg) { - EdgeId edgeId = edge.getId(); - ListenableFuture> deviceIdsFuture = - findAllEntityIdsAsync(pageLink -> deviceService.findDevicesByTenantIdAndEdgeId(tenantId, edgeId, pageLink)); - ListenableFuture> assetIdsFuture = - findAllEntityIdsAsync(pageLink -> assetService.findAssetsByTenantIdAndEdgeId(tenantId, edgeId, pageLink)); - ListenableFuture> deviceProfileIdsFuture = - findAllEntityIdsAsync(pageLink -> deviceProfileService.findDeviceProfiles(tenantId, pageLink)); - ListenableFuture> assetProfileIdsFuture = - findAllEntityIdsAsync(pageLink -> assetProfileService.findAssetProfiles(tenantId, pageLink)); - - ListenableFuture> allEntityIdFuture = - mergeEntityIdFutures(deviceIdsFuture, assetIdsFuture, deviceProfileIdsFuture, assetProfileIdsFuture); - - return Futures.transformAsync(allEntityIdFuture, allIds -> { - log.trace("[{}][{}] Going to sync calculatedFields for [{}] entities", tenantId, edge.getName(), allIds.size()); - List> saveFutures = allIds.stream() - .map(id -> saveCalculatedFieldsToEdge(tenantId, edge.getId(), id)) - .collect(Collectors.toList()); - - return Futures.transform( - Futures.allAsList(saveFutures), - result -> null, - dbCallbackExecutorService - ); - }, dbCallbackExecutorService); - } - - private > ListenableFuture> findAllEntityIdsAsync(Function> fetcher) { - return dbCallbackExecutorService.submit(() -> { - List result = new ArrayList<>(); - PageLink pageLink = new PageLink(100); - PageData pageData; - while (true) { - pageData = fetcher.apply(pageLink); - - Optional.ofNullable(pageData) - .map(PageData::getData) - .ifPresent(dataList -> dataList.stream() - .filter(Objects::nonNull) - .map(ExportableEntity::getId) - .forEach(result::add)); - - if (pageData == null || !pageData.hasNext()) { - break; - } - - pageLink = pageLink.nextPageLink(); - } - return result; - }); - } - - @SafeVarargs - private ListenableFuture> mergeEntityIdFutures(ListenableFuture>... futures) { - return Futures.transform( - Futures.allAsList(Arrays.asList(futures)), - listsOfIds -> listsOfIds.stream() - .filter(Objects::nonNull) - .flatMap(Collection::stream) - .collect(Collectors.toList()), - dbCallbackExecutorService - ); + log.trace("[{}] processCalculatedField [{}][{}] for entity [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg, entityId.getEntityType(), entityId.getId()); + return saveCalculatedFieldsToEdge(tenantId, edge.getId(), entityId); } private ListenableFuture saveCalculatedFieldsToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) { diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java index 5ac0e98d41..85cd8d24fd 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java @@ -31,6 +31,8 @@ public interface CalculatedFieldService extends EntityDaoService { CalculatedField save(CalculatedField calculatedField); + CalculatedField save(CalculatedField calculatedField, boolean doValidate); + CalculatedField findById(TenantId tenantId, CalculatedFieldId calculatedFieldId); CalculatedField findByEntityIdAndName(EntityId entityId, String name); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index 0add13b5f8..dc6abbde8b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -62,6 +62,20 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements return doSave(calculatedField, oldCalculatedField); } + @Override + public CalculatedField save(CalculatedField calculatedField, boolean doValidate) { + CalculatedField oldCalculatedField = null; + + if (doValidate) { + oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId); + } else if (calculatedField.getId() != null) { + oldCalculatedField = findById(calculatedField.getTenantId(), calculatedField.getId()); + } + + return doSave(calculatedField, oldCalculatedField); + } + + private CalculatedField doSave(CalculatedField calculatedField, CalculatedField oldCalculatedField) { try { TenantId tenantId = calculatedField.getTenantId();