diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 7340696788..9fcb7425b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -93,14 +93,13 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS @TbCoreComponent public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { - private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10; - private final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final ConcurrentMap sessionNewEventsLocks = new ConcurrentHashMap<>(); private final Map sessionNewEvents = new HashMap<>(); private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); + private final List zombieSessions = new ArrayList<>(); @Value("${edges.rpc.port}") private int rpcPort; @@ -193,7 +192,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); - this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); + this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60, 60, TimeUnit.SECONDS); log.info("Edge RPC service initialized!"); } @@ -518,14 +517,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void destroySession(EdgeGrpcSession session) { try (session) { - for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { - if (session.destroy()) { - break; - } else { - try { - Thread.sleep(100); - } catch (InterruptedException ignored) {} - } + if (!session.destroy()) { + log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", + session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId()); + zombieSessions.add(session); } } } @@ -634,7 +629,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void destroyKafkaSessionIfDisconnectedAndConsumerActive() { + private void cleanupZombieSessions() { try { List toRemove = new ArrayList<>(); for (EdgeGrpcSession session : sessions.values()) { @@ -655,9 +650,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } } + zombieSessions.removeIf(zombie -> { + if (zombie.destroy()) { + log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); + return true; + } else { + log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); + return false; + } + }); } catch (Exception e) { log.warn("Failed to cleanup kafka sessions", e); } } - }