CalculatedField functionality support for Edge

- Modified downlink push logic
This commit is contained in:
yevhenii 2025-06-05 18:06:51 +03:00
parent 93948bf64b
commit 2761866e58
9 changed files with 85 additions and 68 deletions

View File

@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.alarm.AlarmApiCallResult;
import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.alarm.EntityAlarm;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.domain.Domain;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -262,6 +263,8 @@ public class EdgeEventSourcingListener {
private String getBodyMsgForEntityEvent(Object entity) {
if (entity instanceof AlarmComment) {
return JacksonUtil.toString(entity);
} else if (entity instanceof CalculatedField calculatedField) {
return JacksonUtil.toString(calculatedField.getEntityId());
}
return null;
}

View File

@ -23,7 +23,6 @@ import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.fetch.AdminSettingsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.AssetProfilesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CalculatedFieldsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CustomerEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.DashboardsEdgeEventFetcher;
@ -81,7 +80,6 @@ public class EdgeSyncCursor {
fetchers.add(new DevicesEdgeEventFetcher(ctx.getDeviceService()));
fetchers.add(new AssetsEdgeEventFetcher(ctx.getAssetService()));
fetchers.add(new EntityViewsEdgeEventFetcher(ctx.getEntityViewService()));
fetchers.add(new CalculatedFieldsEdgeEventFetcher(ctx.getCalculatedFieldService()));
if (fullSync) {
fetchers.add(new NotificationTemplateEdgeEventFetcher(ctx.getNotificationTemplateService()));
fetchers.add(new NotificationTargetEdgeEventFetcher(ctx.getNotificationTargetService()));

View File

@ -1,48 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
@AllArgsConstructor
@Slf4j
public class CalculatedFieldsEdgeEventFetcher extends BasePageableEdgeEventFetcher<CalculatedField> {
private final CalculatedFieldService calculatedFieldService;
@Override
PageData<CalculatedField> fetchEntities(TenantId tenantId, Edge edge, PageLink pageLink) {
return calculatedFieldService.findCalculatedFieldsByTenantId(tenantId, pageLink);
}
@Override
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, CalculatedField calculatedField) {
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.CALCULATED_FIELD,
EdgeEventActionType.ADDED, calculatedField.getId(), null);
}
}

View File

@ -25,6 +25,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -53,11 +54,13 @@ import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.edge.EdgeSynchronizationManager;
import org.thingsboard.server.dao.entity.EntityDaoRegistry;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
@ -381,4 +384,12 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
});
}
protected List<CalculatedFieldUpdateMsg> getCalculatedFieldUpdateMsgs(TenantId tenantId, EntityId entityId) {
List<CalculatedField> calculatedFields = edgeCtx.getCalculatedFieldService().findCalculatedFieldsByEntityId(tenantId, entityId);
return calculatedFields.stream()
.map(calculatedField -> EdgeMsgConstructorUtils.constructCalculatedFieldUpdatedMsg(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, calculatedField))
.toList();
}
}

View File

@ -119,6 +119,9 @@ public class AssetEdgeProcessor extends BaseAssetProcessor implements AssetProce
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAssetUpdateMsg(assetUpdateMsg);
getCalculatedFieldUpdateMsgs(edgeEvent.getTenantId(), assetId).forEach(builder::addCalculatedFieldUpdateMsg);
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
AssetProfile assetProfile = edgeCtx.getAssetProfileService().findAssetProfileById(edgeEvent.getTenantId(), asset.getAssetProfileId());
builder.addAssetProfileUpdateMsg(EdgeMsgConstructorUtils.constructAssetProfileUpdatedMsg(msgType, assetProfile));

View File

@ -16,18 +16,30 @@
package org.thingsboard.server.service.edge.rpc.processor.calculated;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
import java.util.ArrayList;
import java.util.List;
import static org.thingsboard.server.dao.edge.BaseRelatedEdgesService.RELATED_EDGES_CACHE_ITEMS;
@Slf4j
public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor {
@ -76,4 +88,23 @@ public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor {
return Pair.of(isCreated, isNameUpdated);
}
protected ListenableFuture<Void> pushEventToAllRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) {
List<ListenableFuture<Void>> futures = new ArrayList<>();
PageDataIterableByTenantIdEntityId<EdgeId> edgeIds =
new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, entityId, RELATED_EDGES_CACHE_ITEMS);
for (EdgeId relatedEdgeId : edgeIds) {
if (!relatedEdgeId.equals(sourceEdgeId)) {
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
}
}
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
}
protected ListenableFuture<Void> pushEventToAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
return switch (actionType) {
case ADDED, UPDATED, DELETED -> processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId);
default -> Futures.immediateFuture(null);
};
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.edge.rpc.processor.calculated;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
@ -22,26 +23,28 @@ import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils;
import java.util.List;
import java.util.UUID;
@Slf4j
@ -88,7 +91,7 @@ public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor i
switch (edgeEvent.getAction()) {
case ADDED, UPDATED -> {
CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(edgeEvent.getTenantId(), calculatedFieldId);
if (calculatedField != null && isEntityAssignedToEdge(edgeEvent, calculatedField)) {
if (calculatedField != null) {
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = EdgeMsgConstructorUtils.constructCalculatedFieldUpdatedMsg(msgType, calculatedField);
return DownlinkMsg.newBuilder()
@ -108,24 +111,37 @@ public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor i
return null;
}
private boolean isEntityAssignedToEdge(EdgeEvent edgeEvent, CalculatedField calculatedField) {
switch (calculatedField.getEntityId().getEntityType()) {
case ASSET, DEVICE -> {
List<EntityRelation> relations =
edgeCtx.getRelationService().findByTo(edgeEvent.getTenantId(), calculatedField.getEntityId(), RelationTypeGroup.EDGE);
return !relations.isEmpty();
}
default -> {
return true;
}
}
}
@Override
public EdgeEventType getEdgeEventType() {
return EdgeEventType.CALCULATED_FIELD;
}
@Override
public ListenableFuture<Void> processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB());
switch (actionType) {
case UPDATED:
case ADDED:
EntityId bodyEntityId = JacksonUtil.fromString(edgeNotificationMsg.getBody(), EntityId.class);
if (bodyEntityId != null &&
(EntityType.DEVICE.equals(bodyEntityId.getEntityType()) || EntityType.ASSET.equals(bodyEntityId.getEntityType()))) {
JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody());
EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB());
return edgeId != null ?
saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body) :
pushEventToAllRelatedEdges(tenantId, entityId, type, actionType, originatorEdgeId);
} else {
return pushEventToAllEdges(tenantId, type, actionType, entityId, originatorEdgeId);
}
default:
return super.processEntityNotification(tenantId, edgeNotificationMsg);
}
}
private void processCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg, Edge edge) {
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateCalculatedField(tenantId, calculatedFieldId, calculatedFieldUpdateMsg);

View File

@ -243,6 +243,9 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor implements DevicePr
DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = EdgeMsgConstructorUtils.constructDeviceCredentialsUpdatedMsg(deviceCredentials);
builder.addDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg).build();
}
getCalculatedFieldUpdateMsgs(edgeEvent.getTenantId(), deviceId).forEach(builder::addCalculatedFieldUpdateMsg);
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
DeviceProfile deviceProfile = edgeCtx.getDeviceProfileService().findDeviceProfileById(edgeEvent.getTenantId(), device.getDeviceProfileId());
builder.addDeviceProfileUpdateMsg(EdgeMsgConstructorUtils.constructDeviceProfileUpdatedMsg(msgType, deviceProfile));

View File

@ -47,7 +47,7 @@ public enum EdgeEventType {
TB_RESOURCE(true, EntityType.TB_RESOURCE),
OAUTH2_CLIENT(true, EntityType.OAUTH2_CLIENT),
DOMAIN(true, EntityType.DOMAIN),
CALCULATED_FIELD(true, EntityType.CALCULATED_FIELD);
CALCULATED_FIELD(false, EntityType.CALCULATED_FIELD);
private final boolean allEdgesRelated;