Merge pull request #13201 from volodymyr-babak/edge-slow-down-sys-tenant-notification-processing
Edge - slow down processing of edge events for SYS_TENANT_ID
This commit is contained in:
		
						commit
						fcc3e1245f
					
				@ -167,6 +167,11 @@ public class EdgeEventSourcingListener {
 | 
				
			|||||||
    @TransactionalEventListener(fallbackExecution = true)
 | 
					    @TransactionalEventListener(fallbackExecution = true)
 | 
				
			||||||
    public void handleEvent(RelationActionEvent event) {
 | 
					    public void handleEvent(RelationActionEvent event) {
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
 | 
					            TenantId tenantId = event.getTenantId();
 | 
				
			||||||
 | 
					            if (ActionType.RELATION_DELETED.equals(event.getActionType()) && !tenantService.tenantExists(tenantId)) {
 | 
				
			||||||
 | 
					                log.debug("[{}] Ignoring RelationActionEvent because tenant does not exist: {}", tenantId, event);
 | 
				
			||||||
 | 
					                return;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
            EntityRelation relation = event.getRelation();
 | 
					            EntityRelation relation = event.getRelation();
 | 
				
			||||||
            if (relation == null) {
 | 
					            if (relation == null) {
 | 
				
			||||||
                log.trace("[{}] skipping RelationActionEvent event in case relation is null: {}", event.getTenantId(), event);
 | 
					                log.trace("[{}] skipping RelationActionEvent event in case relation is null: {}", event.getTenantId(), event);
 | 
				
			||||||
 | 
				
			|||||||
@ -528,7 +528,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
				
			|||||||
                sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
 | 
					                sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
 | 
				
			||||||
                log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
 | 
					                log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
 | 
					                log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
 | 
				
			||||||
                DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId());
 | 
					                DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId());
 | 
				
			||||||
                // if NOT timeseries or attributes failures - ack failed downlink
 | 
					                // if NOT timeseries or attributes failures - ack failed downlink
 | 
				
			||||||
                if (downlinkMsg.getEntityDataCount() == 0) {
 | 
					                if (downlinkMsg.getEntityDataCount() == 0) {
 | 
				
			||||||
 | 
				
			|||||||
@ -95,28 +95,42 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
 | 
				
			|||||||
                                                   EdgeEventActionType action,
 | 
					                                                   EdgeEventActionType action,
 | 
				
			||||||
                                                   EntityId entityId,
 | 
					                                                   EntityId entityId,
 | 
				
			||||||
                                                   JsonNode body) {
 | 
					                                                   JsonNode body) {
 | 
				
			||||||
        ListenableFuture<Optional<AttributeKvEntry>> future =
 | 
					        return saveEdgeEvent(tenantId, edgeId, type, action, entityId, body, true);
 | 
				
			||||||
                edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE);
 | 
					    }
 | 
				
			||||||
        return Futures.transformAsync(future, activeOpt -> {
 | 
					
 | 
				
			||||||
            if (activeOpt.isEmpty()) {
 | 
					    protected ListenableFuture<Void> saveEdgeEvent(TenantId tenantId,
 | 
				
			||||||
                log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
 | 
					                                                   EdgeId edgeId,
 | 
				
			||||||
                                "action [{}], entityId [{}], body [{}]",
 | 
					                                                   EdgeEventType type,
 | 
				
			||||||
                        tenantId, edgeId, type, action, entityId, body);
 | 
					                                                   EdgeEventActionType action,
 | 
				
			||||||
                return Futures.immediateFuture(null);
 | 
					                                                   EntityId entityId,
 | 
				
			||||||
            }
 | 
					                                                   JsonNode body,
 | 
				
			||||||
            if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) {
 | 
					                                                   boolean doValidate) {
 | 
				
			||||||
                return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
 | 
					        if (doValidate) {
 | 
				
			||||||
            } else {
 | 
					            ListenableFuture<Optional<AttributeKvEntry>> future =
 | 
				
			||||||
                if (doSaveIfEdgeIsOffline(type, action)) {
 | 
					                    edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE);
 | 
				
			||||||
                    return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
 | 
					            return Futures.transformAsync(future, activeOpt -> {
 | 
				
			||||||
                } else {
 | 
					                if (activeOpt.isEmpty()) {
 | 
				
			||||||
                    log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
 | 
					                    log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
 | 
				
			||||||
                                    "action [{}], entityId [{}], body [{}]",
 | 
					                                    "action [{}], entityId [{}], body [{}]",
 | 
				
			||||||
                            tenantId, edgeId, type, action, entityId, body);
 | 
					                            tenantId, edgeId, type, action, entityId, body);
 | 
				
			||||||
                    return Futures.immediateFuture(null);
 | 
					                    return Futures.immediateFuture(null);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					                if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) {
 | 
				
			||||||
        }, dbCallbackExecutorService);
 | 
					                    return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
 | 
				
			||||||
 | 
					                } else {
 | 
				
			||||||
 | 
					                    if (doSaveIfEdgeIsOffline(type, action)) {
 | 
				
			||||||
 | 
					                        return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
 | 
				
			||||||
 | 
					                    } else {
 | 
				
			||||||
 | 
					                        log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
 | 
				
			||||||
 | 
					                                        "action [{}], entityId [{}], body [{}]",
 | 
				
			||||||
 | 
					                                tenantId, edgeId, type, action, entityId, body);
 | 
				
			||||||
 | 
					                        return Futures.immediateFuture(null);
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }, dbCallbackExecutorService);
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) {
 | 
					    private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) {
 | 
				
			||||||
@ -145,10 +159,11 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
 | 
				
			|||||||
                                                              JsonNode body, EdgeId sourceEdgeId) {
 | 
					                                                              JsonNode body, EdgeId sourceEdgeId) {
 | 
				
			||||||
        List<ListenableFuture<Void>> futures = new ArrayList<>();
 | 
					        List<ListenableFuture<Void>> futures = new ArrayList<>();
 | 
				
			||||||
        if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
 | 
					        if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
 | 
				
			||||||
            PageDataIterable<TenantId> tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 1024);
 | 
					            PageDataIterable<Edge> edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findActiveEdges(link), 1024);
 | 
				
			||||||
            for (TenantId tenantId1 : tenantIds) {
 | 
					            for (Edge edge : edges) {
 | 
				
			||||||
                futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, body, sourceEdgeId));
 | 
					                futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body, false));
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					            return Futures.immediateFuture(null);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
 | 
					            futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
				
			|||||||
@ -47,6 +47,8 @@ public interface EdgeService extends EntityDaoService {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    Optional<Edge> findEdgeByRoutingKey(TenantId tenantId, String routingKey);
 | 
					    Optional<Edge> findEdgeByRoutingKey(TenantId tenantId, String routingKey);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    PageData<Edge> findActiveEdges(PageLink pageLink);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Edge saveEdge(Edge edge);
 | 
					    Edge saveEdge(Edge edge);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId);
 | 
					    Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId);
 | 
				
			||||||
 | 
				
			|||||||
@ -41,6 +41,8 @@ public interface EdgeDao extends Dao<Edge>, TenantEntityDao<Edge> {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId);
 | 
					    EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    PageData<Edge> findActiveEdges(PageLink pageLink);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink);
 | 
					    PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink);
 | 
					    PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink);
 | 
				
			||||||
 | 
				
			|||||||
@ -192,6 +192,13 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
 | 
				
			|||||||
        return edgeDao.findByRoutingKey(tenantId.getId(), routingKey);
 | 
					        return edgeDao.findByRoutingKey(tenantId.getId(), routingKey);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public PageData<Edge> findActiveEdges(PageLink pageLink) {
 | 
				
			||||||
 | 
					        log.trace("Executing findActiveEdges [{}]", pageLink);
 | 
				
			||||||
 | 
					        Validator.validatePageLink(pageLink);
 | 
				
			||||||
 | 
					        return edgeDao.findActiveEdges(pageLink);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public Edge saveEdge(Edge edge) {
 | 
					    public Edge saveEdge(Edge edge) {
 | 
				
			||||||
        log.trace("Executing saveEdge [{}]", edge);
 | 
					        log.trace("Executing saveEdge [{}]", edge);
 | 
				
			||||||
 | 
				
			|||||||
@ -44,6 +44,18 @@ public interface EdgeRepository extends JpaRepository<EdgeEntity, UUID> {
 | 
				
			|||||||
            "WHERE d.id = :edgeId")
 | 
					            "WHERE d.id = :edgeId")
 | 
				
			||||||
    EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId);
 | 
					    EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Query(value = "SELECT ee.id, ee.created_time, ee.additional_info, ee.customer_id, " +
 | 
				
			||||||
 | 
					            "ee.root_rule_chain_id, ee.type, ee.name, ee.label, ee.routing_key, " +
 | 
				
			||||||
 | 
					            "ee.secret, ee.tenant_id, ee.version " +
 | 
				
			||||||
 | 
					            "FROM edge ee " +
 | 
				
			||||||
 | 
					            "JOIN attribute_kv ON ee.id = attribute_kv.entity_id " +
 | 
				
			||||||
 | 
					            "JOIN key_dictionary ON attribute_kv.attribute_key = key_dictionary.key_id " +
 | 
				
			||||||
 | 
					            "WHERE attribute_kv.bool_v = true AND key_dictionary.key = 'active' " +
 | 
				
			||||||
 | 
					            "AND (:textSearch IS NULL OR ee.name ILIKE CONCAT('%', :textSearch, '%')) " +
 | 
				
			||||||
 | 
					            "ORDER BY ee.id", nativeQuery = true)
 | 
				
			||||||
 | 
					    Page<EdgeEntity> findActiveEdges(@Param("textSearch") String textSearch,
 | 
				
			||||||
 | 
					                                 Pageable pageable);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Query("SELECT d.id FROM EdgeEntity d WHERE d.tenantId = :tenantId " +
 | 
					    @Query("SELECT d.id FROM EdgeEntity d WHERE d.tenantId = :tenantId " +
 | 
				
			||||||
            "AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)")
 | 
					            "AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)")
 | 
				
			||||||
    Page<UUID> findIdsByTenantId(@Param("tenantId") UUID tenantId,
 | 
					    Page<UUID> findIdsByTenantId(@Param("tenantId") UUID tenantId,
 | 
				
			||||||
 | 
				
			|||||||
@ -66,6 +66,14 @@ public class JpaEdgeDao extends JpaAbstractDao<EdgeEntity, Edge> implements Edge
 | 
				
			|||||||
        return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId));
 | 
					        return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public PageData<Edge> findActiveEdges(PageLink pageLink) {
 | 
				
			||||||
 | 
					        return DaoUtil.toPageData(
 | 
				
			||||||
 | 
					                edgeRepository.findActiveEdges(
 | 
				
			||||||
 | 
					                        pageLink.getTextSearch(),
 | 
				
			||||||
 | 
					                        DaoUtil.toPageable(pageLink)));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink) {
 | 
					    public PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink) {
 | 
				
			||||||
        return DaoUtil.pageToPageData(
 | 
					        return DaoUtil.pageToPageData(
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user