diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 5671ffb2ab..eaef1f7c7d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -94,6 +94,8 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS @TbCoreComponent public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { + private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10; + private final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final ConcurrentMap sessionNewEventsLocks = new ConcurrentHashMap<>(); private final Map sessionNewEvents = new HashMap<>(); @@ -283,9 +285,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i EdgeGrpcSession session = sessions.get(edgeId); if (session != null && session.isConnected()) { log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId); - session.destroy(); + destroySession(session); session.cleanUp(); - session.close(); sessions.remove(edgeId); final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); newEventLock.lock(); @@ -521,7 +522,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void destroySession(EdgeGrpcSession session) { try (session) { - session.destroy(); + for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { + if (session.destroy()) { + break; + } else { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) {} + } + } } } @@ -643,9 +652,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } for (EdgeId edgeId : toRemove) { log.info("[{}] Destroying session for edge because edge is not connected", edgeId); - EdgeGrpcSession removed = sessions.remove(edgeId); + EdgeGrpcSession removed = sessions.get(edgeId); if (removed instanceof KafkaEdgeGrpcSession kafkaSession) { - kafkaSession.destroy(); + if (kafkaSession.destroy()) { + sessions.remove(edgeId); + } } } } catch (Exception e) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 94e0c155fe..c658bcf403 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -918,7 +918,9 @@ public abstract class EdgeGrpcSession implements Closeable { return Futures.allAsList(result); } - protected void destroy() {} + protected boolean destroy() { + return true; + } protected void cleanUp() {} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index daffe9db11..ab0b42abb4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -135,19 +135,25 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { } @Override - public void destroy() { + public boolean destroy() { try { if (consumer != null) { consumer.stop(); } - } finally { - consumer = null; + } catch (Exception e) { + log.warn("[{}][{}] Failed to stop edge event consumer", tenantId, edge.getId(), e); + return false; } + consumer = null; try { if (consumerExecutor != null) { consumerExecutor.shutdown(); } - } catch (Exception ignored) {} + } catch (Exception e) { + log.warn("[{}][{}] Failed to shutdown consumer executor", tenantId, edge.getId(), e); + return false; + } + return true; } @Override