CalculatedField functionality support for Edge

- refactoring
This commit is contained in:
yevhenii 2025-06-24 13:37:34 +03:00
parent 33d80df54c
commit a67f2eb516
5 changed files with 21 additions and 85 deletions

View File

@ -139,8 +139,8 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
UPDATED_COMMENT, DELETED -> true;
default -> switch (type) {
case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE,
WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, NOTIFICATION_TEMPLATE, NOTIFICATION_TARGET,
NOTIFICATION_RULE -> true;
WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, CALCULATED_FIELD, NOTIFICATION_TEMPLATE,
NOTIFICATION_TARGET, NOTIFICATION_RULE -> true;
default -> false;
};
};

View File

@ -80,7 +80,7 @@ public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor {
calculatedField.setId(calculatedFieldId);
}
edgeCtx.getCalculatedFieldService().save(calculatedField);
edgeCtx.getCalculatedFieldService().save(calculatedField, false);
} catch (Exception e) {
log.error("[{}] Failed to process calculatedField update msg [{}]", tenantId, calculatedFieldUpdateMsg, e);
throw e;

View File

@ -23,7 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -32,7 +31,6 @@ import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -48,8 +46,6 @@ import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@ -82,15 +78,10 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@ -328,80 +319,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
EntityType.valueOf(calculatedFieldRequestMsg.getEntityType()),
new UUID(calculatedFieldRequestMsg.getEntityIdMSB(), calculatedFieldRequestMsg.getEntityIdLSB()));
if (entityId.getEntityType() == EntityType.EDGE) {
log.trace("[{}] processAllCalculatedField [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg);
return syncAllCalculatedFieldForEdge(tenantId, edge, calculatedFieldRequestMsg);
} else {
log.trace("[{}] processCalculatedField [{}][{}] for entity [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg, entityId.getEntityType(), entityId.getId());
return saveCalculatedFieldsToEdge(tenantId, edge.getId(), entityId);
}
}
@NotNull
private ListenableFuture<Void> syncAllCalculatedFieldForEdge(TenantId tenantId, Edge edge, CalculatedFieldRequestMsg calculatedFieldRequestMsg) {
EdgeId edgeId = edge.getId();
ListenableFuture<List<EntityId>> deviceIdsFuture =
findAllEntityIdsAsync(pageLink -> deviceService.findDevicesByTenantIdAndEdgeId(tenantId, edgeId, pageLink));
ListenableFuture<List<EntityId>> assetIdsFuture =
findAllEntityIdsAsync(pageLink -> assetService.findAssetsByTenantIdAndEdgeId(tenantId, edgeId, pageLink));
ListenableFuture<List<EntityId>> deviceProfileIdsFuture =
findAllEntityIdsAsync(pageLink -> deviceProfileService.findDeviceProfiles(tenantId, pageLink));
ListenableFuture<List<EntityId>> assetProfileIdsFuture =
findAllEntityIdsAsync(pageLink -> assetProfileService.findAssetProfiles(tenantId, pageLink));
ListenableFuture<List<EntityId>> allEntityIdFuture =
mergeEntityIdFutures(deviceIdsFuture, assetIdsFuture, deviceProfileIdsFuture, assetProfileIdsFuture);
return Futures.transformAsync(allEntityIdFuture, allIds -> {
log.trace("[{}][{}] Going to sync calculatedFields for [{}] entities", tenantId, edge.getName(), allIds.size());
List<ListenableFuture<Void>> saveFutures = allIds.stream()
.map(id -> saveCalculatedFieldsToEdge(tenantId, edge.getId(), id))
.collect(Collectors.toList());
return Futures.transform(
Futures.allAsList(saveFutures),
result -> null,
dbCallbackExecutorService
);
}, dbCallbackExecutorService);
}
private <T extends ExportableEntity<? extends EntityId>> ListenableFuture<List<EntityId>> findAllEntityIdsAsync(Function<PageLink, PageData<T>> fetcher) {
return dbCallbackExecutorService.submit(() -> {
List<EntityId> result = new ArrayList<>();
PageLink pageLink = new PageLink(100);
PageData<T> pageData;
while (true) {
pageData = fetcher.apply(pageLink);
Optional.ofNullable(pageData)
.map(PageData::getData)
.ifPresent(dataList -> dataList.stream()
.filter(Objects::nonNull)
.map(ExportableEntity::getId)
.forEach(result::add));
if (pageData == null || !pageData.hasNext()) {
break;
}
pageLink = pageLink.nextPageLink();
}
return result;
});
}
@SafeVarargs
private ListenableFuture<List<EntityId>> mergeEntityIdFutures(ListenableFuture<List<EntityId>>... futures) {
return Futures.transform(
Futures.allAsList(Arrays.asList(futures)),
listsOfIds -> listsOfIds.stream()
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList()),
dbCallbackExecutorService
);
}
private ListenableFuture<Void> saveCalculatedFieldsToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) {
return Futures.transformAsync(

View File

@ -31,6 +31,8 @@ public interface CalculatedFieldService extends EntityDaoService {
CalculatedField save(CalculatedField calculatedField);
CalculatedField save(CalculatedField calculatedField, boolean doValidate);
CalculatedField findById(TenantId tenantId, CalculatedFieldId calculatedFieldId);
CalculatedField findByEntityIdAndName(EntityId entityId, String name);

View File

@ -62,6 +62,20 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
return doSave(calculatedField, oldCalculatedField);
}
@Override
public CalculatedField save(CalculatedField calculatedField, boolean doValidate) {
CalculatedField oldCalculatedField = null;
if (doValidate) {
oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId);
} else if (calculatedField.getId() != null) {
oldCalculatedField = findById(calculatedField.getTenantId(), calculatedField.getId());
}
return doSave(calculatedField, oldCalculatedField);
}
private CalculatedField doSave(CalculatedField calculatedField, CalculatedField oldCalculatedField) {
try {
TenantId tenantId = calculatedField.getTenantId();