diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index eaef1f7c7d..e1a193bdfa 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -69,17 +69,8 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -102,6 +93,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); + private final Queue zombieSessions = new ConcurrentLinkedQueue<>(); @Value("${edges.rpc.port}") private int rpcPort; @@ -166,6 +158,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private ScheduledExecutorService executorService; + private ScheduledExecutorService zombieSessionsExecutorService; + @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) public void onStartUp() { log.info("Initializing Edge RPC service!"); @@ -197,7 +191,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); + this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions"); this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); + this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS); log.info("Edge RPC service initialized!"); } @@ -520,11 +516,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i edgeIdServiceIdCache.evict(edgeId); } - private void destroySession(EdgeGrpcSession session) { + private boolean destroySession(EdgeGrpcSession session) { try (session) { for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { if (session.destroy()) { - break; + return true; } else { try { Thread.sleep(100); @@ -532,6 +528,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } } + return false; } private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { @@ -663,4 +660,28 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.warn("Failed to cleanup kafka sessions", e); } } + + private void cleanupZombieSessions() { + int zombiesToProcess = zombieSessions.size(); + if (zombiesToProcess == 0) { + return; + } + log.info("Found {} zombie sessions in the queue. Starting cleanup cycle.", zombiesToProcess); + for (int i = 0; i < zombiesToProcess; i++) { + EdgeGrpcSession zombie = zombieSessions.poll(); + if (zombie == null) { + break; + } + log.warn("[{}] Attempting to clean up zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getSessionId(), zombie.getEdge().getId()); + if (!destroySession(zombie)) { + log.warn("[{}] Zombie session [{}] cleanup failed again. Re-queuing for next attempt.", + zombie.getTenantId(), zombie.getSessionId()); + zombieSessions.add(zombie); + } else { + log.info("[{}] Successfully cleaned up zombie session [{}].", + zombie.getTenantId(), zombie.getSessionId()); + } + } + } }