Introduced BaseRelationProcessor
This commit is contained in:
		
							parent
							
								
									85579fa93f
								
							
						
					
					
						commit
						965b951699
					
				@ -575,7 +575,7 @@ public final class EdgeGrpcSession implements Closeable {
 | 
			
		||||
            }
 | 
			
		||||
            if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
 | 
			
		||||
                for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
 | 
			
		||||
                    result.add(ctx.getRelationProcessor().processRelationFromEdge(edge.getTenantId(), relationUpdateMsg));
 | 
			
		||||
                    result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,108 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.edge.rpc.processor.relation;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AssetId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DashboardId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityViewId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@Slf4j
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
public class BaseRelationProcessor extends BaseEdgeProcessor {
 | 
			
		||||
 | 
			
		||||
    public ListenableFuture<Void> processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
 | 
			
		||||
        log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg);
 | 
			
		||||
        try {
 | 
			
		||||
            EntityRelation entityRelation = new EntityRelation();
 | 
			
		||||
 | 
			
		||||
            UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB());
 | 
			
		||||
            EntityId fromId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getFromEntityType()), fromUUID);
 | 
			
		||||
            entityRelation.setFrom(fromId);
 | 
			
		||||
 | 
			
		||||
            UUID toUUID = new UUID(relationUpdateMsg.getToIdMSB(), relationUpdateMsg.getToIdLSB());
 | 
			
		||||
            EntityId toId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getToEntityType()), toUUID);
 | 
			
		||||
            entityRelation.setTo(toId);
 | 
			
		||||
 | 
			
		||||
            entityRelation.setType(relationUpdateMsg.getType());
 | 
			
		||||
            entityRelation.setTypeGroup(relationUpdateMsg.hasTypeGroup()
 | 
			
		||||
                    ? RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup()) : RelationTypeGroup.COMMON);
 | 
			
		||||
            entityRelation.setAdditionalInfo(JacksonUtil.toJsonNode(relationUpdateMsg.getAdditionalInfo()));
 | 
			
		||||
            switch (relationUpdateMsg.getMsgType()) {
 | 
			
		||||
                case ENTITY_CREATED_RPC_MESSAGE:
 | 
			
		||||
                case ENTITY_UPDATED_RPC_MESSAGE:
 | 
			
		||||
                    if (isEntityExists(tenantId, entityRelation.getTo())
 | 
			
		||||
                            && isEntityExists(tenantId, entityRelation.getFrom())) {
 | 
			
		||||
                        return Futures.transform(relationService.saveRelationAsync(tenantId, entityRelation),
 | 
			
		||||
                                (result) -> null, dbCallbackExecutorService);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.warn("Skipping relating update msg because from/to entity doesn't exists on edge, {}", relationUpdateMsg);
 | 
			
		||||
                        return Futures.immediateFuture(null);
 | 
			
		||||
                    }
 | 
			
		||||
                case ENTITY_DELETED_RPC_MESSAGE:
 | 
			
		||||
                    return Futures.transform(relationService.deleteRelationAsync(tenantId, entityRelation),
 | 
			
		||||
                            (result) -> null, dbCallbackExecutorService);
 | 
			
		||||
                case UNRECOGNIZED:
 | 
			
		||||
                default:
 | 
			
		||||
                    return handleUnsupportedMsgType(relationUpdateMsg.getMsgType());
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e);
 | 
			
		||||
            return Futures.immediateFailedFuture(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isEntityExists(TenantId tenantId, EntityId entityId) {
 | 
			
		||||
        switch (entityId.getEntityType()) {
 | 
			
		||||
            case DEVICE:
 | 
			
		||||
                return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
 | 
			
		||||
            case ASSET:
 | 
			
		||||
                return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null;
 | 
			
		||||
            case ENTITY_VIEW:
 | 
			
		||||
                return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null;
 | 
			
		||||
            case CUSTOMER:
 | 
			
		||||
                return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null;
 | 
			
		||||
            case USER:
 | 
			
		||||
                return userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
 | 
			
		||||
            case DASHBOARD:
 | 
			
		||||
                return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
 | 
			
		||||
            case EDGE:
 | 
			
		||||
                return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
 | 
			
		||||
            default:
 | 
			
		||||
                return false;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -26,98 +26,24 @@ import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AssetId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DashboardId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityViewId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@Slf4j
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
public class RelationEdgeProcessor extends BaseEdgeProcessor {
 | 
			
		||||
 | 
			
		||||
    public ListenableFuture<Void> processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
 | 
			
		||||
        log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg);
 | 
			
		||||
        try {
 | 
			
		||||
            EntityRelation entityRelation = new EntityRelation();
 | 
			
		||||
 | 
			
		||||
            UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB());
 | 
			
		||||
            EntityId fromId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getFromEntityType()), fromUUID);
 | 
			
		||||
            entityRelation.setFrom(fromId);
 | 
			
		||||
 | 
			
		||||
            UUID toUUID = new UUID(relationUpdateMsg.getToIdMSB(), relationUpdateMsg.getToIdLSB());
 | 
			
		||||
            EntityId toId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getToEntityType()), toUUID);
 | 
			
		||||
            entityRelation.setTo(toId);
 | 
			
		||||
 | 
			
		||||
            entityRelation.setType(relationUpdateMsg.getType());
 | 
			
		||||
            if (relationUpdateMsg.hasTypeGroup()) {
 | 
			
		||||
                entityRelation.setTypeGroup(RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup()));
 | 
			
		||||
            }
 | 
			
		||||
            entityRelation.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.readTree(relationUpdateMsg.getAdditionalInfo()));
 | 
			
		||||
            switch (relationUpdateMsg.getMsgType()) {
 | 
			
		||||
                case ENTITY_CREATED_RPC_MESSAGE:
 | 
			
		||||
                case ENTITY_UPDATED_RPC_MESSAGE:
 | 
			
		||||
                    if (isEntityExists(tenantId, entityRelation.getTo())
 | 
			
		||||
                            && isEntityExists(tenantId, entityRelation.getFrom())) {
 | 
			
		||||
                        return Futures.transform(relationService.saveRelationAsync(tenantId, entityRelation),
 | 
			
		||||
                                (result) -> null, dbCallbackExecutorService);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.warn("Skipping relating update msg because from/to entity doesn't exists on edge, {}", relationUpdateMsg);
 | 
			
		||||
                        return Futures.immediateFuture(null);
 | 
			
		||||
                    }
 | 
			
		||||
                case ENTITY_DELETED_RPC_MESSAGE:
 | 
			
		||||
                    return Futures.transform(relationService.deleteRelationAsync(tenantId, entityRelation),
 | 
			
		||||
                            (result) -> null, dbCallbackExecutorService);
 | 
			
		||||
                case UNRECOGNIZED:
 | 
			
		||||
                default:
 | 
			
		||||
                    return handleUnsupportedMsgType(relationUpdateMsg.getMsgType());
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Failed to process relation update msg [{}]", relationUpdateMsg, e);
 | 
			
		||||
            return Futures.immediateFailedFuture(new RuntimeException("Failed to process relation update msg", e));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isEntityExists(TenantId tenantId, EntityId entityId) throws ThingsboardException {
 | 
			
		||||
        switch (entityId.getEntityType()) {
 | 
			
		||||
            case DEVICE:
 | 
			
		||||
                return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
 | 
			
		||||
            case ASSET:
 | 
			
		||||
                return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null;
 | 
			
		||||
            case ENTITY_VIEW:
 | 
			
		||||
                return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null;
 | 
			
		||||
            case CUSTOMER:
 | 
			
		||||
                return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null;
 | 
			
		||||
            case USER:
 | 
			
		||||
                return userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
 | 
			
		||||
            case DASHBOARD:
 | 
			
		||||
                return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
 | 
			
		||||
            default:
 | 
			
		||||
                throw new ThingsboardException("Unsupported entity type " + entityId.getEntityType(), ThingsboardErrorCode.INVALID_ARGUMENTS);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
public class RelationEdgeProcessor extends BaseRelationProcessor {
 | 
			
		||||
 | 
			
		||||
    public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
 | 
			
		||||
        EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user