From e30adf0447450a6e2796392fd5b06e06bdba6c0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=97=AD?= Date: Sun, 10 Aug 2025 10:30:49 +0800 Subject: [PATCH 1/4] Fix: Improve Edge session cleanup to prevent resource leaks In unstable network environments, Edge devices may frequently disconnect and reconnect. The previous session cleanup logic could fail to stop the Kafka consumer, creating a 'zombie consumer'. This commit introduces a multi-layered defense: 1. Proactively evicts stale members from the Kafka consumer group upon new connection to ensure immediate functionality. 2. Adds a background task to persistently try and clean up session objects that failed to destroy, preventing memory/thread leaks. --- .../service/edge/rpc/EdgeGrpcService.java | 47 ++++++++++++++----- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index eaef1f7c7d..e1a193bdfa 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -69,17 +69,8 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -102,6 +93,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); + private final Queue zombieSessions = new ConcurrentLinkedQueue<>(); @Value("${edges.rpc.port}") private int rpcPort; @@ -166,6 +158,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private ScheduledExecutorService executorService; + private ScheduledExecutorService zombieSessionsExecutorService; + @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) public void onStartUp() { log.info("Initializing Edge RPC service!"); @@ -197,7 +191,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); + this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions"); this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); + this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS); log.info("Edge RPC service initialized!"); } @@ -520,11 +516,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i edgeIdServiceIdCache.evict(edgeId); } - private void destroySession(EdgeGrpcSession session) { + private boolean destroySession(EdgeGrpcSession session) { try (session) { for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { if (session.destroy()) { - break; + return true; } else { try { Thread.sleep(100); @@ -532,6 +528,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } } + return false; } private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { @@ -663,4 +660,28 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.warn("Failed to cleanup kafka sessions", e); } } + + private void cleanupZombieSessions() { + int zombiesToProcess = zombieSessions.size(); + if (zombiesToProcess == 0) { + return; + } + log.info("Found {} zombie sessions in the queue. Starting cleanup cycle.", zombiesToProcess); + for (int i = 0; i < zombiesToProcess; i++) { + EdgeGrpcSession zombie = zombieSessions.poll(); + if (zombie == null) { + break; + } + log.warn("[{}] Attempting to clean up zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getSessionId(), zombie.getEdge().getId()); + if (!destroySession(zombie)) { + log.warn("[{}] Zombie session [{}] cleanup failed again. Re-queuing for next attempt.", + zombie.getTenantId(), zombie.getSessionId()); + zombieSessions.add(zombie); + } else { + log.info("[{}] Successfully cleaned up zombie session [{}].", + zombie.getTenantId(), zombie.getSessionId()); + } + } + } } From 4523efbcdd73993c3822f68e587ac33157f663e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=97=AD?= Date: Sun, 10 Aug 2025 11:42:47 +0800 Subject: [PATCH 2/4] Fix: Improve Edge session cleanup to prevent resource leaks In unstable network environments, Edge devices may frequently disconnect and reconnect. The previous session cleanup logic could fail to stop the Kafka consumer, creating a 'zombie consumer'. This commit introduces a multi-layered defense: 1. Proactively evicts stale members from the Kafka consumer group upon new connection to ensure immediate functionality. 2. Adds a background task to persistently try and clean up session objects that failed to destroy, preventing memory/thread leaks. --- .../thingsboard/server/service/edge/rpc/EdgeGrpcService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index e1a193bdfa..4217fd509b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -219,6 +219,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (executorService != null) { executorService.shutdownNow(); } + if(zombieSessionsExecutorService != null){ + zombieSessionsExecutorService.shutdownNow(); + } } @Override From 9384a4b5309ad3fb2f7d088a088ba14f8293659b Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 25 Sep 2025 22:00:35 +0300 Subject: [PATCH 3/4] Refactoring and simplified --- .../service/edge/rpc/EdgeGrpcService.java | 87 +++++++------------ 1 file changed, 31 insertions(+), 56 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index d0b997d7b9..f92e89823a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -68,8 +68,17 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; import java.io.InputStream; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -84,15 +93,13 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS @TbCoreComponent public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { - private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10; - private final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final ConcurrentMap sessionNewEventsLocks = new ConcurrentHashMap<>(); private final Map sessionNewEvents = new HashMap<>(); private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); - private final Queue zombieSessions = new ConcurrentLinkedQueue<>(); + private final List zombieSessions = new ArrayList<>(); @Value("${edges.rpc.port}") private int rpcPort; @@ -154,8 +161,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private ScheduledExecutorService executorService; - private ScheduledExecutorService zombieSessionsExecutorService; - @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) public void onStartUp() { log.info("Initializing Edge RPC service!"); @@ -187,9 +192,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); - this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions"); - this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); - this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS); + this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60, 60, TimeUnit.SECONDS); log.info("Edge RPC service initialized!"); } @@ -215,9 +218,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (executorService != null) { executorService.shutdownNow(); } - if(zombieSessionsExecutorService != null){ - zombieSessionsExecutorService.shutdownNow(); - } } @Override @@ -501,13 +501,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i sessionNewEvents.remove(edgeId); } finally { newEventLock.unlock(); - } - boolean destroySessionResult = destroySession(toRemove); - if(!destroySessionResult){ - log.error("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", - edge.getTenantId(), edgeId, edge.getName(), sessionId); - zombieSessions.add(toRemove); - } + }destroySession(toRemove); TenantId tenantId = toRemove.getEdge().getTenantId(); save(tenantId, edgeId, ACTIVITY_STATE, false); long lastDisconnectTs = System.currentTimeMillis(); @@ -520,19 +514,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i edgeIdServiceIdCache.evict(edgeId); } - private boolean destroySession(EdgeGrpcSession session) { + private void destroySession(EdgeGrpcSession session) { try (session) { - for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { - if (session.destroy()) { - return true; - } else { - try { - Thread.sleep(100); - } catch (InterruptedException ignored) {} - } + if (!session.destroy()) { + log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", + session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId()); + zombieSessions.add(session); } } - return false; } private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { @@ -639,7 +628,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void destroyKafkaSessionIfDisconnectedAndConsumerActive() { + private void cleanupZombieSessions() { try { List toRemove = new ArrayList<>(); for (EdgeGrpcSession session : sessions.values()) { @@ -660,33 +649,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } } + zombieSessions.removeIf(zombie -> { + if (zombie.destroy()) { + log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); + return true; + } else { + log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); + return false; + } + }); } catch (Exception e) { log.warn("Failed to cleanup kafka sessions", e); } } - - - private void cleanupZombieSessions() { - int zombiesToProcess = zombieSessions.size(); - if (zombiesToProcess == 0) { - return; - } - log.info("Found {} zombie sessions in the queue. Starting cleanup cycle.", zombiesToProcess); - for (int i = 0; i < zombiesToProcess; i++) { - EdgeGrpcSession zombie = zombieSessions.poll(); - if (zombie == null) { - break; - } - log.warn("[{}] Attempting to clean up zombie session [{}] for edge [{}].", - zombie.getTenantId(), zombie.getSessionId(), zombie.getEdge().getId()); - if (!destroySession(zombie)) { - log.warn("[{}] Zombie session [{}] cleanup failed again. Re-queuing for next attempt.", - zombie.getTenantId(), zombie.getSessionId()); - zombieSessions.add(zombie); - } else { - log.info("[{}] Successfully cleaned up zombie session [{}].", - zombie.getTenantId(), zombie.getSessionId()); - } - } - } } From 5edc8cb277672b6093c614f701cf52d3e4a4dd61 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 25 Sep 2025 22:02:10 +0300 Subject: [PATCH 4/4] Line formatted --- .../thingsboard/server/service/edge/rpc/EdgeGrpcService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index f92e89823a..9fcb7425b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -501,7 +501,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i sessionNewEvents.remove(edgeId); } finally { newEventLock.unlock(); - }destroySession(toRemove); + } + destroySession(toRemove); TenantId tenantId = toRemove.getEdge().getTenantId(); save(tenantId, edgeId, ACTIVITY_STATE, false); long lastDisconnectTs = System.currentTimeMillis();