Edge - slow down processing of edge events for SYS_TENANT_ID
This commit is contained in:
		
							parent
							
								
									eea7c2f623
								
							
						
					
					
						commit
						d045ef23e2
					
				@ -167,6 +167,11 @@ public class EdgeEventSourcingListener {
 | 
				
			|||||||
    @TransactionalEventListener(fallbackExecution = true)
 | 
					    @TransactionalEventListener(fallbackExecution = true)
 | 
				
			||||||
    public void handleEvent(RelationActionEvent event) {
 | 
					    public void handleEvent(RelationActionEvent event) {
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
 | 
					            TenantId tenantId = event.getTenantId();
 | 
				
			||||||
 | 
					            if (!tenantId.isSysTenantId() && !tenantService.tenantExists(tenantId)) {
 | 
				
			||||||
 | 
					                log.debug("[{}] Ignoring RelationActionEvent because tenant does not exist: {}", tenantId, event);
 | 
				
			||||||
 | 
					                return;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
            EntityRelation relation = event.getRelation();
 | 
					            EntityRelation relation = event.getRelation();
 | 
				
			||||||
            if (relation == null) {
 | 
					            if (relation == null) {
 | 
				
			||||||
                log.trace("[{}] skipping RelationActionEvent event in case relation is null: {}", event.getTenantId(), event);
 | 
					                log.trace("[{}] skipping RelationActionEvent event in case relation is null: {}", event.getTenantId(), event);
 | 
				
			||||||
 | 
				
			|||||||
@ -528,7 +528,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
				
			|||||||
                sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
 | 
					                sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
 | 
				
			||||||
                log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
 | 
					                log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
 | 
					                log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
 | 
				
			||||||
                DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId());
 | 
					                DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId());
 | 
				
			||||||
                // if NOT timeseries or attributes failures - ack failed downlink
 | 
					                // if NOT timeseries or attributes failures - ack failed downlink
 | 
				
			||||||
                if (downlinkMsg.getEntityDataCount() == 0) {
 | 
					                if (downlinkMsg.getEntityDataCount() == 0) {
 | 
				
			||||||
 | 
				
			|||||||
@ -65,6 +65,7 @@ import java.util.ArrayList;
 | 
				
			|||||||
import java.util.List;
 | 
					import java.util.List;
 | 
				
			||||||
import java.util.Optional;
 | 
					import java.util.Optional;
 | 
				
			||||||
import java.util.UUID;
 | 
					import java.util.UUID;
 | 
				
			||||||
 | 
					import java.util.concurrent.TimeUnit;
 | 
				
			||||||
import java.util.concurrent.locks.Lock;
 | 
					import java.util.concurrent.locks.Lock;
 | 
				
			||||||
import java.util.concurrent.locks.ReentrantLock;
 | 
					import java.util.concurrent.locks.ReentrantLock;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -142,17 +143,24 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type,
 | 
					    protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type,
 | 
				
			||||||
                                                              EdgeEventActionType actionType, EntityId entityId,
 | 
					                                                              EdgeEventActionType actionType, EntityId entityId,
 | 
				
			||||||
                                                              JsonNode body, EdgeId sourceEdgeId) {
 | 
					                                                              EdgeId sourceEdgeId) {
 | 
				
			||||||
        List<ListenableFuture<Void>> futures = new ArrayList<>();
 | 
					 | 
				
			||||||
        if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
 | 
					        if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
 | 
				
			||||||
            PageDataIterable<TenantId> tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 1024);
 | 
					            PageDataIterable<TenantId> tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 500);
 | 
				
			||||||
            for (TenantId tenantId1 : tenantIds) {
 | 
					            for (TenantId tenantId1 : tenantIds) {
 | 
				
			||||||
                futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, body, sourceEdgeId));
 | 
					                try {
 | 
				
			||||||
 | 
					                    List<ListenableFuture<Void>> sysTenantFutures = processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null, sourceEdgeId);
 | 
				
			||||||
 | 
					                    for (ListenableFuture<Void> future : sysTenantFutures) {
 | 
				
			||||||
 | 
					                        future.get(10, TimeUnit.SECONDS);
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                } catch (Exception e) {
 | 
				
			||||||
 | 
					                    log.error("Failed to process action for all edges by SYS_TENANT_ID. Failed tenantId = [{}]", tenantId1, e);
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					            return Futures.immediateFuture(null);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
 | 
					            List<ListenableFuture<Void>> tenantFutures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
 | 
				
			||||||
 | 
					            return Futures.transform(Futures.allAsList(tenantFutures), voids -> null, dbCallbackExecutorService);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
 | 
					    private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
 | 
				
			||||||
@ -284,7 +292,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
 | 
				
			|||||||
    private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
 | 
					    private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
 | 
				
			||||||
        return switch (actionType) {
 | 
					        return switch (actionType) {
 | 
				
			||||||
            case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity
 | 
					            case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity
 | 
				
			||||||
                    processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId);
 | 
					                    processActionForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId);
 | 
				
			||||||
            default -> Futures.immediateFuture(null);
 | 
					            default -> Futures.immediateFuture(null);
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user