diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 7bafd53644..70c63a96cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.EntityAlarm; import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.domain.Domain; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -262,6 +263,8 @@ public class EdgeEventSourcingListener { private String getBodyMsgForEntityEvent(Object entity) { if (entity instanceof AlarmComment) { return JacksonUtil.toString(entity); + } else if (entity instanceof CalculatedField calculatedField) { + return JacksonUtil.toString(calculatedField.getEntityId()); } return null; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index 389f0202fa..adab9b812f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -23,7 +23,6 @@ import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.rpc.fetch.AdminSettingsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.AssetProfilesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher; -import org.thingsboard.server.service.edge.rpc.fetch.CalculatedFieldsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.CustomerEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.DashboardsEdgeEventFetcher; @@ -81,7 +80,6 @@ public class EdgeSyncCursor { fetchers.add(new DevicesEdgeEventFetcher(ctx.getDeviceService())); fetchers.add(new AssetsEdgeEventFetcher(ctx.getAssetService())); fetchers.add(new EntityViewsEdgeEventFetcher(ctx.getEntityViewService())); - fetchers.add(new CalculatedFieldsEdgeEventFetcher(ctx.getCalculatedFieldService())); if (fullSync) { fetchers.add(new NotificationTemplateEdgeEventFetcher(ctx.getNotificationTemplateService())); fetchers.add(new NotificationTargetEdgeEventFetcher(ctx.getNotificationTargetService())); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java deleted file mode 100644 index 4f3e91354e..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.edge.rpc.fetch; - -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.EdgeUtils; -import org.thingsboard.server.common.data.cf.CalculatedField; -import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeEvent; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.dao.cf.CalculatedFieldService; - -@AllArgsConstructor -@Slf4j -public class CalculatedFieldsEdgeEventFetcher extends BasePageableEdgeEventFetcher { - - private final CalculatedFieldService calculatedFieldService; - - @Override - PageData fetchEntities(TenantId tenantId, Edge edge, PageLink pageLink) { - return calculatedFieldService.findCalculatedFieldsByTenantId(tenantId, pageLink); - } - - @Override - EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, CalculatedField calculatedField) { - return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.CALCULATED_FIELD, - EdgeEventActionType.ADDED, calculatedField.getId(), null); - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 6fcb02e4bc..077ff21d26 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -25,6 +25,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -53,11 +54,13 @@ import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.edge.EdgeSynchronizationManager; import org.thingsboard.server.dao.entity.EntityDaoRegistry; +import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.service.edge.EdgeContextComponent; +import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.state.DefaultDeviceStateService; @@ -381,4 +384,12 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { }); } + protected List getCalculatedFieldUpdateMsgs(TenantId tenantId, EntityId entityId) { + List calculatedFields = edgeCtx.getCalculatedFieldService().findCalculatedFieldsByEntityId(tenantId, entityId); + + return calculatedFields.stream() + .map(calculatedField -> EdgeMsgConstructorUtils.constructCalculatedFieldUpdatedMsg(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, calculatedField)) + .toList(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index 47f4c11362..538a90f7d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -119,6 +119,9 @@ public class AssetEdgeProcessor extends BaseAssetProcessor implements AssetProce DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .addAssetUpdateMsg(assetUpdateMsg); + + getCalculatedFieldUpdateMsgs(edgeEvent.getTenantId(), assetId).forEach(builder::addCalculatedFieldUpdateMsg); + if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) { AssetProfile assetProfile = edgeCtx.getAssetProfileService().findAssetProfileById(edgeEvent.getTenantId(), asset.getAssetProfileId()); builder.addAssetProfileUpdateMsg(EdgeMsgConstructorUtils.constructAssetProfileUpdatedMsg(msgType, assetProfile)); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java index ddb1b23d53..63cc1e024d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java @@ -16,18 +16,30 @@ package org.thingsboard.server.service.edge.rpc.processor.calculated; import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.util.Pair; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; +import java.util.ArrayList; +import java.util.List; + +import static org.thingsboard.server.dao.edge.BaseRelatedEdgesService.RELATED_EDGES_CACHE_ITEMS; + @Slf4j public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor { @@ -76,4 +88,23 @@ public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor { return Pair.of(isCreated, isNameUpdated); } + protected ListenableFuture pushEventToAllRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) { + List> futures = new ArrayList<>(); + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, entityId, RELATED_EDGES_CACHE_ITEMS); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); + } + } + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + } + + protected ListenableFuture pushEventToAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { + return switch (actionType) { + case ADDED, UPDATED, DELETED -> processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId); + default -> Futures.immediateFuture(null); + }; + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java index f936b0b1d4..cf28003707 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.edge.rpc.processor.calculated; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -22,26 +23,28 @@ import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils; -import java.util.List; import java.util.UUID; @Slf4j @@ -88,7 +91,7 @@ public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor i switch (edgeEvent.getAction()) { case ADDED, UPDATED -> { CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(edgeEvent.getTenantId(), calculatedFieldId); - if (calculatedField != null && isEntityAssignedToEdge(edgeEvent, calculatedField)) { + if (calculatedField != null) { UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = EdgeMsgConstructorUtils.constructCalculatedFieldUpdatedMsg(msgType, calculatedField); return DownlinkMsg.newBuilder() @@ -108,24 +111,37 @@ public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor i return null; } - private boolean isEntityAssignedToEdge(EdgeEvent edgeEvent, CalculatedField calculatedField) { - switch (calculatedField.getEntityId().getEntityType()) { - case ASSET, DEVICE -> { - List relations = - edgeCtx.getRelationService().findByTo(edgeEvent.getTenantId(), calculatedField.getEntityId(), RelationTypeGroup.EDGE); - return !relations.isEmpty(); - } - default -> { - return true; - } - } - } - @Override public EdgeEventType getEdgeEventType() { return EdgeEventType.CALCULATED_FIELD; } + @Override + public ListenableFuture processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); + + switch (actionType) { + case UPDATED: + case ADDED: + EntityId bodyEntityId = JacksonUtil.fromString(edgeNotificationMsg.getBody(), EntityId.class); + if (bodyEntityId != null && + (EntityType.DEVICE.equals(bodyEntityId.getEntityType()) || EntityType.ASSET.equals(bodyEntityId.getEntityType()))) { + JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody()); + EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()); + + return edgeId != null ? + saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body) : + pushEventToAllRelatedEdges(tenantId, entityId, type, actionType, originatorEdgeId); + } else { + return pushEventToAllEdges(tenantId, type, actionType, entityId, originatorEdgeId); + } + default: + return super.processEntityNotification(tenantId, edgeNotificationMsg); + } + } private void processCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg, Edge edge) { Pair resultPair = super.saveOrUpdateCalculatedField(tenantId, calculatedFieldId, calculatedFieldUpdateMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index ab01f83cd8..be2a04626c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -243,6 +243,9 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor implements DevicePr DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = EdgeMsgConstructorUtils.constructDeviceCredentialsUpdatedMsg(deviceCredentials); builder.addDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg).build(); } + + getCalculatedFieldUpdateMsgs(edgeEvent.getTenantId(), deviceId).forEach(builder::addCalculatedFieldUpdateMsg); + if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) { DeviceProfile deviceProfile = edgeCtx.getDeviceProfileService().findDeviceProfileById(edgeEvent.getTenantId(), device.getDeviceProfileId()); builder.addDeviceProfileUpdateMsg(EdgeMsgConstructorUtils.constructDeviceProfileUpdatedMsg(msgType, deviceProfile)); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index 9c97935329..0d5c3f34ad 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -47,7 +47,7 @@ public enum EdgeEventType { TB_RESOURCE(true, EntityType.TB_RESOURCE), OAUTH2_CLIENT(true, EntityType.OAUTH2_CLIENT), DOMAIN(true, EntityType.DOMAIN), - CALCULATED_FIELD(true, EntityType.CALCULATED_FIELD); + CALCULATED_FIELD(false, EntityType.CALCULATED_FIELD); private final boolean allEdgesRelated;