Merge pull request #13626 from volodymyr-babak/edge-improve-close-session

Improved Kafka Edge Session destroy logic - added retry attempts to avoid unclosed consumers
This commit is contained in:
Viacheslav Klimov 2025-06-23 16:30:54 +03:00 committed by GitHub
commit 8d45f09869
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 10 deletions

View File

@ -94,6 +94,8 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS
@TbCoreComponent
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10;
private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>();
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
@ -283,9 +285,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId);
session.destroy();
destroySession(session);
session.cleanUp();
session.close();
sessions.remove(edgeId);
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock();
@ -521,7 +522,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void destroySession(EdgeGrpcSession session) {
try (session) {
session.destroy();
for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) {
if (session.destroy()) {
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {}
}
}
}
}
@ -643,9 +652,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
for (EdgeId edgeId : toRemove) {
log.info("[{}] Destroying session for edge because edge is not connected", edgeId);
EdgeGrpcSession removed = sessions.remove(edgeId);
EdgeGrpcSession removed = sessions.get(edgeId);
if (removed instanceof KafkaEdgeGrpcSession kafkaSession) {
kafkaSession.destroy();
if (kafkaSession.destroy()) {
sessions.remove(edgeId);
}
}
}
} catch (Exception e) {

View File

@ -918,7 +918,9 @@ public abstract class EdgeGrpcSession implements Closeable {
return Futures.allAsList(result);
}
protected void destroy() {}
protected boolean destroy() {
return true;
}
protected void cleanUp() {}

View File

@ -135,19 +135,25 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
}
@Override
public void destroy() {
public boolean destroy() {
try {
if (consumer != null) {
consumer.stop();
}
} finally {
consumer = null;
} catch (Exception e) {
log.warn("[{}][{}] Failed to stop edge event consumer", tenantId, edge.getId(), e);
return false;
}
consumer = null;
try {
if (consumerExecutor != null) {
consumerExecutor.shutdown();
}
} catch (Exception ignored) {}
} catch (Exception e) {
log.warn("[{}][{}] Failed to shutdown consumer executor", tenantId, edge.getId(), e);
return false;
}
return true;
}
@Override