CalculatedField functionality support for Edge

- changed logic for sync CalculatedField
This commit is contained in:
yevhenii 2025-06-24 11:55:03 +03:00
parent b747b6b1bf
commit 33d80df54c
12 changed files with 160 additions and 37 deletions

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldRequestMsg;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg;
import org.thingsboard.server.gen.edge.v1.ConnectResponseCode;
@ -882,6 +883,11 @@ public abstract class EdgeGrpcSession implements Closeable {
result.add(ctx.getEdgeRequestsService().processRelationRequestMsg(edge.getTenantId(), edge, relationRequestMsg));
}
}
if (uplinkMsg.getCalculatedFieldRequestMsgCount() > 0) {
for (CalculatedFieldRequestMsg calculatedFieldRequestMsg : uplinkMsg.getCalculatedFieldRequestMsgList()) {
result.add(ctx.getEdgeRequestsService().processCalculatedFieldRequestMsg(edge.getTenantId(), edge, calculatedFieldRequestMsg));
}
}
if (uplinkMsg.getUserCredentialsRequestMsgCount() > 0) {
for (UserCredentialsRequestMsg userCredentialsRequestMsg : uplinkMsg.getUserCredentialsRequestMsgList()) {
result.add(ctx.getEdgeRequestsService().processUserCredentialsRequestMsg(edge.getTenantId(), edge, userCredentialsRequestMsg));

View File

@ -25,7 +25,6 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -54,13 +53,11 @@ import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.edge.EdgeSynchronizationManager;
import org.thingsboard.server.dao.entity.EntityDaoRegistry;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
@ -384,12 +381,4 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
});
}
protected List<CalculatedFieldUpdateMsg> getCalculatedFieldUpdateMsgs(TenantId tenantId, EntityId entityId) {
List<CalculatedField> calculatedFields = edgeCtx.getCalculatedFieldService().findCalculatedFieldsByEntityId(tenantId, entityId);
return calculatedFields.stream()
.map(calculatedField -> EdgeMsgConstructorUtils.constructCalculatedFieldUpdatedMsg(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, calculatedField))
.toList();
}
}

View File

@ -120,8 +120,6 @@ public class AssetEdgeProcessor extends BaseAssetProcessor implements AssetProce
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAssetUpdateMsg(assetUpdateMsg);
getCalculatedFieldUpdateMsgs(edgeEvent.getTenantId(), assetId).forEach(builder::addCalculatedFieldUpdateMsg);
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
AssetProfile assetProfile = edgeCtx.getAssetProfileService().findAssetProfileById(edgeEvent.getTenantId(), asset.getAssetProfileId());
builder.addAssetProfileUpdateMsg(EdgeMsgConstructorUtils.constructAssetProfileUpdatedMsg(msgType, assetProfile));

View File

@ -80,7 +80,7 @@ public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor {
calculatedField.setId(calculatedFieldId);
}
edgeCtx.getCalculatedFieldService().save(calculatedField, false);
edgeCtx.getCalculatedFieldService().save(calculatedField);
} catch (Exception e) {
log.error("[{}] Failed to process calculatedField update msg [{}]", tenantId, calculatedFieldUpdateMsg, e);
throw e;

View File

@ -244,8 +244,6 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor implements DevicePr
builder.addDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg).build();
}
getCalculatedFieldUpdateMsgs(edgeEvent.getTenantId(), deviceId).forEach(builder::addCalculatedFieldUpdateMsg);
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
DeviceProfile deviceProfile = edgeCtx.getDeviceProfileService().findDeviceProfileById(edgeEvent.getTenantId(), device.getDeviceProfileId());
builder.addDeviceProfileUpdateMsg(EdgeMsgConstructorUtils.constructDeviceProfileUpdatedMsg(msgType, deviceProfile));

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.ExportableEntity;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -46,6 +48,8 @@ import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@ -53,13 +57,19 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldRequestMsg;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.v1.EntityViewsRequestMsg;
import org.thingsboard.server.gen.edge.v1.RelationRequestMsg;
@ -72,10 +82,16 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@TbCoreComponent
@ -104,6 +120,17 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
@Autowired
private WidgetTypeService widgetTypeService;
@Autowired
private CalculatedFieldService calculatedFieldService;
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceProfileService deviceProfileService;
@Autowired
private AssetService assetService;
@Autowired
private AssetProfileService assetProfileService;
@Autowired
private DbCallbackExecutorService dbCallbackExecutorService;
@ -293,6 +320,116 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
return futureToSet;
}
@Override
public ListenableFuture<Void> processCalculatedFieldRequestMsg(TenantId tenantId, Edge edge, CalculatedFieldRequestMsg calculatedFieldRequestMsg) {
log.trace("[{}] processCalculatedFieldRequestMsg [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
EntityType.valueOf(calculatedFieldRequestMsg.getEntityType()),
new UUID(calculatedFieldRequestMsg.getEntityIdMSB(), calculatedFieldRequestMsg.getEntityIdLSB()));
if (entityId.getEntityType() == EntityType.EDGE) {
log.trace("[{}] processAllCalculatedField [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg);
return syncAllCalculatedFieldForEdge(tenantId, edge, calculatedFieldRequestMsg);
} else {
log.trace("[{}] processCalculatedField [{}][{}] for entity [{}][{}]", tenantId, edge.getName(), calculatedFieldRequestMsg, entityId.getEntityType(), entityId.getId());
return saveCalculatedFieldsToEdge(tenantId, edge.getId(), entityId);
}
}
@NotNull
private ListenableFuture<Void> syncAllCalculatedFieldForEdge(TenantId tenantId, Edge edge, CalculatedFieldRequestMsg calculatedFieldRequestMsg) {
EdgeId edgeId = edge.getId();
ListenableFuture<List<EntityId>> deviceIdsFuture =
findAllEntityIdsAsync(pageLink -> deviceService.findDevicesByTenantIdAndEdgeId(tenantId, edgeId, pageLink));
ListenableFuture<List<EntityId>> assetIdsFuture =
findAllEntityIdsAsync(pageLink -> assetService.findAssetsByTenantIdAndEdgeId(tenantId, edgeId, pageLink));
ListenableFuture<List<EntityId>> deviceProfileIdsFuture =
findAllEntityIdsAsync(pageLink -> deviceProfileService.findDeviceProfiles(tenantId, pageLink));
ListenableFuture<List<EntityId>> assetProfileIdsFuture =
findAllEntityIdsAsync(pageLink -> assetProfileService.findAssetProfiles(tenantId, pageLink));
ListenableFuture<List<EntityId>> allEntityIdFuture =
mergeEntityIdFutures(deviceIdsFuture, assetIdsFuture, deviceProfileIdsFuture, assetProfileIdsFuture);
return Futures.transformAsync(allEntityIdFuture, allIds -> {
log.trace("[{}][{}] Going to sync calculatedFields for [{}] entities", tenantId, edge.getName(), allIds.size());
List<ListenableFuture<Void>> saveFutures = allIds.stream()
.map(id -> saveCalculatedFieldsToEdge(tenantId, edge.getId(), id))
.collect(Collectors.toList());
return Futures.transform(
Futures.allAsList(saveFutures),
result -> null,
dbCallbackExecutorService
);
}, dbCallbackExecutorService);
}
private <T extends ExportableEntity<? extends EntityId>> ListenableFuture<List<EntityId>> findAllEntityIdsAsync(Function<PageLink, PageData<T>> fetcher) {
return dbCallbackExecutorService.submit(() -> {
List<EntityId> result = new ArrayList<>();
PageLink pageLink = new PageLink(100);
PageData<T> pageData;
while (true) {
pageData = fetcher.apply(pageLink);
Optional.ofNullable(pageData)
.map(PageData::getData)
.ifPresent(dataList -> dataList.stream()
.filter(Objects::nonNull)
.map(ExportableEntity::getId)
.forEach(result::add));
if (pageData == null || !pageData.hasNext()) {
break;
}
pageLink = pageLink.nextPageLink();
}
return result;
});
}
@SafeVarargs
private ListenableFuture<List<EntityId>> mergeEntityIdFutures(ListenableFuture<List<EntityId>>... futures) {
return Futures.transform(
Futures.allAsList(Arrays.asList(futures)),
listsOfIds -> listsOfIds.stream()
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList()),
dbCallbackExecutorService
);
}
private ListenableFuture<Void> saveCalculatedFieldsToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) {
return Futures.transformAsync(
dbCallbackExecutorService.submit(() -> calculatedFieldService.findCalculatedFieldsByEntityId(tenantId, entityId)),
calculatedFields -> {
log.trace("[{}][{}][{}][{}] calculatedField(s) are going to be pushed to edge.", tenantId, edgeId, entityId, calculatedFields.size());
List<ListenableFuture<?>> futures = calculatedFields.stream().map(calculatedField -> {
try {
return saveEdgeEvent(tenantId, edgeId, EdgeEventType.CALCULATED_FIELD,
EdgeEventActionType.ADDED, calculatedField.getId(), JacksonUtil.valueToTree(calculatedField));
} catch (Exception e) {
String errMsg = String.format("[%s][%s] Exception during loading calculatedField [%s] to edge on sync!", tenantId, edgeId, calculatedField);
log.error(errMsg, e);
return Futures.immediateFailedFuture(e);
}
}).collect(Collectors.toList());
return Futures.transform(
Futures.allAsList(futures),
voids -> null,
dbCallbackExecutorService
);
},
dbCallbackExecutorService
);
}
private ListenableFuture<List<EntityRelation>> findRelationByQuery(TenantId tenantId, Edge edge, EntityId entityId, EntitySearchDirection direction) {
EntityRelationsQuery query = new EntityRelationsQuery();
query.setParameters(new RelationsSearchParameters(entityId, direction, 1, false));

View File

@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldRequestMsg;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.v1.EntityViewsRequestMsg;
import org.thingsboard.server.gen.edge.v1.RelationRequestMsg;
@ -35,6 +36,8 @@ public interface EdgeRequestsService {
ListenableFuture<Void> processRelationRequestMsg(TenantId tenantId, Edge edge, RelationRequestMsg relationRequestMsg);
ListenableFuture<Void> processCalculatedFieldRequestMsg(TenantId tenantId, Edge edge, CalculatedFieldRequestMsg calculatedFieldRequestMsg);
@Deprecated(since = "3.9.1", forRemoval = true)
ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg);
@ -46,4 +49,5 @@ public interface EdgeRequestsService {
@Deprecated(since = "3.9.1", forRemoval = true)
ListenableFuture<Void> processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg);
}

View File

@ -31,8 +31,6 @@ public interface CalculatedFieldService extends EntityDaoService {
CalculatedField save(CalculatedField calculatedField);
CalculatedField save(CalculatedField calculatedField, boolean doValidate);
CalculatedField findById(TenantId tenantId, CalculatedFieldId calculatedFieldId);
CalculatedField findByEntityIdAndName(EntityId entityId, String name);

View File

@ -334,6 +334,12 @@ message RelationRequestMsg {
string entityType = 3;
}
message CalculatedFieldRequestMsg {
int64 entityIdMSB = 1;
int64 entityIdLSB = 2;
string entityType = 3;
}
// DEPRECATED. FOR REMOVAL
message UserCredentialsRequestMsg {
option deprecated = true;
@ -433,6 +439,7 @@ message UplinkMsg {
repeated RuleChainUpdateMsg ruleChainUpdateMsg = 23;
repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 24;
repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 25;
repeated CalculatedFieldRequestMsg calculatedFieldRequestMsg = 26;
}
message UplinkResponseMsg {

View File

@ -62,17 +62,6 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
return doSave(calculatedField, oldCalculatedField);
}
@Override
public CalculatedField save(CalculatedField calculatedField, boolean doValidate) {
CalculatedField oldCalculatedField = null;
if (doValidate) {
oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId);
}
return doSave(calculatedField, oldCalculatedField);
}
private CalculatedField doSave(CalculatedField calculatedField, CalculatedField oldCalculatedField) {
try {
TenantId tenantId = calculatedField.getTenantId();
@ -104,7 +93,7 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
log.trace("Executing findByEntityIdAndName [{}], calculatedFieldName[{}]", entityId, name);
validateId(entityId.getId(), id -> INCORRECT_ENTITY_ID + id);
return calculatedFieldDao.findByEntityIdAndName(entityId, name).orElse(null);
return calculatedFieldDao.findByEntityIdAndName(entityId, name);
}
@Override

View File

@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.Dao;
import java.util.List;
import java.util.Optional;
public interface CalculatedFieldDao extends Dao<CalculatedField> {
@ -36,7 +35,7 @@ public interface CalculatedFieldDao extends Dao<CalculatedField> {
List<CalculatedField> findAll();
Optional<CalculatedField> findByEntityIdAndName(EntityId entityId, String name);
CalculatedField findByEntityIdAndName(EntityId entityId, String name);
PageData<CalculatedField> findAll(PageLink pageLink);

View File

@ -34,7 +34,6 @@ import org.thingsboard.server.dao.sql.JpaAbstractDao;
import org.thingsboard.server.dao.util.SqlDao;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@ -67,9 +66,8 @@ public class JpaCalculatedFieldDao extends JpaAbstractDao<CalculatedFieldEntity,
}
@Override
public Optional<CalculatedField> findByEntityIdAndName(EntityId entityId, String name) {
CalculatedField calculatedField = DaoUtil.getData(calculatedFieldRepository.findByEntityIdAndName(entityId.getId(), name));
return Optional.ofNullable(calculatedField);
public CalculatedField findByEntityIdAndName(EntityId entityId, String name) {
return DaoUtil.getData(calculatedFieldRepository.findByEntityIdAndName(entityId.getId(), name));
}
@Override