Merge pull request #13843 from bcblr1993/fix-edge-zombie-consumer-cleanup

Improved Edge session cleanup to prevent resource leaks and message backlog due to unstable network conditions and Kafka busy timeout
This commit is contained in:
Viacheslav Klimov 2025-09-26 10:50:14 +03:00 committed by GitHub
commit db9c4ed6d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -93,14 +93,13 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS
@TbCoreComponent
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10;
private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>();
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>();
private final List<EdgeGrpcSession> zombieSessions = new ArrayList<>();
@Value("${edges.rpc.port}")
private int rpcPort;
@ -193,7 +192,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS);
this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60, 60, TimeUnit.SECONDS);
log.info("Edge RPC service initialized!");
}
@ -518,14 +517,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void destroySession(EdgeGrpcSession session) {
try (session) {
for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) {
if (session.destroy()) {
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {}
}
if (!session.destroy()) {
log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.",
session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId());
zombieSessions.add(session);
}
}
}
@ -634,7 +629,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
}
private void destroyKafkaSessionIfDisconnectedAndConsumerActive() {
private void cleanupZombieSessions() {
try {
List<EdgeId> toRemove = new ArrayList<>();
for (EdgeGrpcSession session : sessions.values()) {
@ -655,9 +650,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
}
}
zombieSessions.removeIf(zombie -> {
if (zombie.destroy()) {
log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].",
zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName());
return true;
} else {
log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].",
zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName());
return false;
}
});
} catch (Exception e) {
log.warn("Failed to cleanup kafka sessions", e);
}
}
}