diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 44ab906b74..94715c92dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -575,7 +575,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getRelationUpdateMsgCount() > 0) { for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) { - result.add(ctx.getRelationProcessor().processRelationFromEdge(edge.getTenantId(), relationUpdateMsg)); + result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg)); } } if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java new file mode 100644 index 0000000000..2c9a15f70d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java @@ -0,0 +1,108 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.relation; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; + +import java.util.UUID; + +@Component +@Slf4j +@TbCoreComponent +public class BaseRelationProcessor extends BaseEdgeProcessor { + + public ListenableFuture processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { + log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg); + try { + EntityRelation entityRelation = new EntityRelation(); + + UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB()); + EntityId fromId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getFromEntityType()), fromUUID); + entityRelation.setFrom(fromId); + + UUID toUUID = new UUID(relationUpdateMsg.getToIdMSB(), relationUpdateMsg.getToIdLSB()); + EntityId toId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getToEntityType()), toUUID); + entityRelation.setTo(toId); + + entityRelation.setType(relationUpdateMsg.getType()); + entityRelation.setTypeGroup(relationUpdateMsg.hasTypeGroup() + ? RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup()) : RelationTypeGroup.COMMON); + entityRelation.setAdditionalInfo(JacksonUtil.toJsonNode(relationUpdateMsg.getAdditionalInfo())); + switch (relationUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + if (isEntityExists(tenantId, entityRelation.getTo()) + && isEntityExists(tenantId, entityRelation.getFrom())) { + return Futures.transform(relationService.saveRelationAsync(tenantId, entityRelation), + (result) -> null, dbCallbackExecutorService); + } else { + log.warn("Skipping relating update msg because from/to entity doesn't exists on edge, {}", relationUpdateMsg); + return Futures.immediateFuture(null); + } + case ENTITY_DELETED_RPC_MESSAGE: + return Futures.transform(relationService.deleteRelationAsync(tenantId, entityRelation), + (result) -> null, dbCallbackExecutorService); + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(relationUpdateMsg.getMsgType()); + } + } catch (Exception e) { + log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e); + return Futures.immediateFailedFuture(e); + } + } + + private boolean isEntityExists(TenantId tenantId, EntityId entityId) { + switch (entityId.getEntityType()) { + case DEVICE: + return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; + case ASSET: + return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null; + case ENTITY_VIEW: + return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; + case CUSTOMER: + return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; + case USER: + return userService.findUserById(tenantId, new UserId(entityId.getId())) != null; + case DASHBOARD: + return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; + case EDGE: + return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null; + default: + return false; + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java index 2835222aeb..07e25e4c23 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java @@ -26,98 +26,24 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; -import org.thingsboard.server.common.data.exception.ThingsboardException; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DashboardId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.UUID; @Component @Slf4j @TbCoreComponent -public class RelationEdgeProcessor extends BaseEdgeProcessor { - - public ListenableFuture processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { - log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg); - try { - EntityRelation entityRelation = new EntityRelation(); - - UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB()); - EntityId fromId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getFromEntityType()), fromUUID); - entityRelation.setFrom(fromId); - - UUID toUUID = new UUID(relationUpdateMsg.getToIdMSB(), relationUpdateMsg.getToIdLSB()); - EntityId toId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getToEntityType()), toUUID); - entityRelation.setTo(toId); - - entityRelation.setType(relationUpdateMsg.getType()); - if (relationUpdateMsg.hasTypeGroup()) { - entityRelation.setTypeGroup(RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup())); - } - entityRelation.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.readTree(relationUpdateMsg.getAdditionalInfo())); - switch (relationUpdateMsg.getMsgType()) { - case ENTITY_CREATED_RPC_MESSAGE: - case ENTITY_UPDATED_RPC_MESSAGE: - if (isEntityExists(tenantId, entityRelation.getTo()) - && isEntityExists(tenantId, entityRelation.getFrom())) { - return Futures.transform(relationService.saveRelationAsync(tenantId, entityRelation), - (result) -> null, dbCallbackExecutorService); - } else { - log.warn("Skipping relating update msg because from/to entity doesn't exists on edge, {}", relationUpdateMsg); - return Futures.immediateFuture(null); - } - case ENTITY_DELETED_RPC_MESSAGE: - return Futures.transform(relationService.deleteRelationAsync(tenantId, entityRelation), - (result) -> null, dbCallbackExecutorService); - case UNRECOGNIZED: - default: - return handleUnsupportedMsgType(relationUpdateMsg.getMsgType()); - } - } catch (Exception e) { - log.error("Failed to process relation update msg [{}]", relationUpdateMsg, e); - return Futures.immediateFailedFuture(new RuntimeException("Failed to process relation update msg", e)); - } - } - - private boolean isEntityExists(TenantId tenantId, EntityId entityId) throws ThingsboardException { - switch (entityId.getEntityType()) { - case DEVICE: - return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; - case ASSET: - return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null; - case ENTITY_VIEW: - return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; - case CUSTOMER: - return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; - case USER: - return userService.findUserById(tenantId, new UserId(entityId.getId())) != null; - case DASHBOARD: - return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; - default: - throw new ThingsboardException("Unsupported entity type " + entityId.getEntityType(), ThingsboardErrorCode.INVALID_ARGUMENTS); - } - } +public class RelationEdgeProcessor extends BaseRelationProcessor { public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) { EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class);