Improved Kafka Edge Session destroy logic - added retry attempts to avoid unclosed consumers

This commit is contained in:
Volodymyr Babak 2025-06-23 16:10:03 +03:00
parent 24d695727a
commit 6014eed852
3 changed files with 29 additions and 10 deletions

View File

@ -94,6 +94,8 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS
@TbCoreComponent @TbCoreComponent
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10;
private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>();
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>(); private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
@ -283,9 +285,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
EdgeGrpcSession session = sessions.get(edgeId); EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) { if (session != null && session.isConnected()) {
log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId); log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId);
session.destroy(); destroySession(session);
session.cleanUp(); session.cleanUp();
session.close();
sessions.remove(edgeId); sessions.remove(edgeId);
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock(); newEventLock.lock();
@ -521,7 +522,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void destroySession(EdgeGrpcSession session) { private void destroySession(EdgeGrpcSession session) {
try (session) { try (session) {
session.destroy(); for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) {
if (session.destroy()) {
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {}
}
}
} }
} }
@ -643,9 +652,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} }
for (EdgeId edgeId : toRemove) { for (EdgeId edgeId : toRemove) {
log.info("[{}] Destroying session for edge because edge is not connected", edgeId); log.info("[{}] Destroying session for edge because edge is not connected", edgeId);
EdgeGrpcSession removed = sessions.remove(edgeId); EdgeGrpcSession removed = sessions.get(edgeId);
if (removed instanceof KafkaEdgeGrpcSession kafkaSession) { if (removed instanceof KafkaEdgeGrpcSession kafkaSession) {
kafkaSession.destroy(); if (kafkaSession.destroy()) {
sessions.remove(edgeId);
}
} }
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -918,7 +918,9 @@ public abstract class EdgeGrpcSession implements Closeable {
return Futures.allAsList(result); return Futures.allAsList(result);
} }
protected void destroy() {} protected boolean destroy() {
return true;
}
protected void cleanUp() {} protected void cleanUp() {}

View File

@ -135,19 +135,25 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
} }
@Override @Override
public void destroy() { public boolean destroy() {
try { try {
if (consumer != null) { if (consumer != null) {
consumer.stop(); consumer.stop();
} }
} finally { } catch (Exception e) {
consumer = null; log.warn("[{}][{}] Failed to stop edge event consumer", tenantId, edge.getId(), e);
return false;
} }
consumer = null;
try { try {
if (consumerExecutor != null) { if (consumerExecutor != null) {
consumerExecutor.shutdown(); consumerExecutor.shutdown();
} }
} catch (Exception ignored) {} } catch (Exception e) {
log.warn("[{}][{}] Failed to shutdown consumer executor", tenantId, edge.getId(), e);
return false;
}
return true;
} }
@Override @Override