Improved Edge session cleanup
This commit is contained in:
		
							parent
							
								
									07280bcc36
								
							
						
					
					
						commit
						e477dd17b6
					
				@ -93,14 +93,13 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS
 | 
				
			|||||||
@TbCoreComponent
 | 
					@TbCoreComponent
 | 
				
			||||||
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
 | 
					public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
 | 
				
			||||||
    private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>();
 | 
				
			||||||
    private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
 | 
					    private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
 | 
				
			||||||
    private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
 | 
				
			||||||
    private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
 | 
				
			||||||
    private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>();
 | 
				
			||||||
 | 
					    private final List<EdgeGrpcSession> zombieSessions = new ArrayList<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Value("${edges.rpc.port}")
 | 
					    @Value("${edges.rpc.port}")
 | 
				
			||||||
    private int rpcPort;
 | 
					    private int rpcPort;
 | 
				
			||||||
@ -193,7 +192,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
				
			|||||||
        this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
 | 
					        this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
 | 
				
			||||||
        this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
 | 
					        this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
 | 
				
			||||||
        this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
 | 
					        this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
 | 
				
			||||||
        this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS);
 | 
					        this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60, 60, TimeUnit.SECONDS);
 | 
				
			||||||
        log.info("Edge RPC service initialized!");
 | 
					        log.info("Edge RPC service initialized!");
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -518,14 +517,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private void destroySession(EdgeGrpcSession session) {
 | 
					    private void destroySession(EdgeGrpcSession session) {
 | 
				
			||||||
        try (session) {
 | 
					        try (session) {
 | 
				
			||||||
            for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) {
 | 
					            if (!session.destroy()) {
 | 
				
			||||||
                if (session.destroy()) {
 | 
					                log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.",
 | 
				
			||||||
                    break;
 | 
					                        session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId());
 | 
				
			||||||
                } else {
 | 
					                zombieSessions.add(session);
 | 
				
			||||||
                    try {
 | 
					 | 
				
			||||||
                        Thread.sleep(100);
 | 
					 | 
				
			||||||
                    } catch (InterruptedException ignored) {}
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -634,7 +629,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void destroyKafkaSessionIfDisconnectedAndConsumerActive() {
 | 
					    private void cleanupZombieSessions() {
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
            List<EdgeId> toRemove = new ArrayList<>();
 | 
					            List<EdgeId> toRemove = new ArrayList<>();
 | 
				
			||||||
            for (EdgeGrpcSession session : sessions.values()) {
 | 
					            for (EdgeGrpcSession session : sessions.values()) {
 | 
				
			||||||
@ -655,6 +650,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
				
			|||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					            zombieSessions.removeIf(zombie -> {
 | 
				
			||||||
 | 
					                if (zombie.destroy()) {
 | 
				
			||||||
 | 
					                    log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].",
 | 
				
			||||||
 | 
					                            zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName());
 | 
				
			||||||
 | 
					                    return true;
 | 
				
			||||||
 | 
					                } else {
 | 
				
			||||||
 | 
					                    log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].",
 | 
				
			||||||
 | 
					                            zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName());
 | 
				
			||||||
 | 
					                    return false;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            });
 | 
				
			||||||
        } catch (Exception e) {
 | 
					        } catch (Exception e) {
 | 
				
			||||||
            log.warn("Failed to cleanup kafka sessions", e);
 | 
					            log.warn("Failed to cleanup kafka sessions", e);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user