Fix: Improve Edge session cleanup to prevent resource leaks In unstable network environments, Edge devices may frequently disconnect and reconnect. The previous session cleanup logic could fail to stop the Kafka consumer, creating a 'zombie consumer'. This commit introduces a multi-layered defense: 1. Proactively evicts stale members from the Kafka consumer group upon new connection to ensure immediate functionality. 2. Adds a background task to persistently try and clean up session objects that failed to destroy, preventing memory/thread leaks.

This commit is contained in:
陈旭 2025-08-10 10:30:49 +08:00
parent 609a68c991
commit e30adf0447

View File

@ -69,17 +69,8 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap; import java.util.concurrent.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -102,6 +93,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>();
private final Queue<EdgeGrpcSession> zombieSessions = new ConcurrentLinkedQueue<>();
@Value("${edges.rpc.port}") @Value("${edges.rpc.port}")
private int rpcPort; private int rpcPort;
@ -166,6 +158,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private ScheduledExecutorService executorService; private ScheduledExecutorService executorService;
private ScheduledExecutorService zombieSessionsExecutorService;
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void onStartUp() { public void onStartUp() {
log.info("Initializing Edge RPC service!"); log.info("Initializing Edge RPC service!");
@ -197,7 +191,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions");
this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS);
this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS);
log.info("Edge RPC service initialized!"); log.info("Edge RPC service initialized!");
} }
@ -520,11 +516,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
edgeIdServiceIdCache.evict(edgeId); edgeIdServiceIdCache.evict(edgeId);
} }
private void destroySession(EdgeGrpcSession session) { private boolean destroySession(EdgeGrpcSession session) {
try (session) { try (session) {
for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) {
if (session.destroy()) { if (session.destroy()) {
break; return true;
} else { } else {
try { try {
Thread.sleep(100); Thread.sleep(100);
@ -532,6 +528,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} }
} }
} }
return false;
} }
private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { private void save(TenantId tenantId, EdgeId edgeId, String key, long value) {
@ -663,4 +660,28 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.warn("Failed to cleanup kafka sessions", e); log.warn("Failed to cleanup kafka sessions", e);
} }
} }
private void cleanupZombieSessions() {
int zombiesToProcess = zombieSessions.size();
if (zombiesToProcess == 0) {
return;
}
log.info("Found {} zombie sessions in the queue. Starting cleanup cycle.", zombiesToProcess);
for (int i = 0; i < zombiesToProcess; i++) {
EdgeGrpcSession zombie = zombieSessions.poll();
if (zombie == null) {
break;
}
log.warn("[{}] Attempting to clean up zombie session [{}] for edge [{}].",
zombie.getTenantId(), zombie.getSessionId(), zombie.getEdge().getId());
if (!destroySession(zombie)) {
log.warn("[{}] Zombie session [{}] cleanup failed again. Re-queuing for next attempt.",
zombie.getTenantId(), zombie.getSessionId());
zombieSessions.add(zombie);
} else {
log.info("[{}] Successfully cleaned up zombie session [{}].",
zombie.getTenantId(), zombie.getSessionId());
}
}
}
} }