From 9384a4b5309ad3fb2f7d088a088ba14f8293659b Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 25 Sep 2025 22:00:35 +0300 Subject: [PATCH] Refactoring and simplified --- .../service/edge/rpc/EdgeGrpcService.java | 87 +++++++------------ 1 file changed, 31 insertions(+), 56 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index d0b997d7b9..f92e89823a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -68,8 +68,17 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; import java.io.InputStream; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -84,15 +93,13 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS @TbCoreComponent public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { - private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10; - private final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final ConcurrentMap sessionNewEventsLocks = new ConcurrentHashMap<>(); private final Map sessionNewEvents = new HashMap<>(); private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); - private final Queue zombieSessions = new ConcurrentLinkedQueue<>(); + private final List zombieSessions = new ArrayList<>(); @Value("${edges.rpc.port}") private int rpcPort; @@ -154,8 +161,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private ScheduledExecutorService executorService; - private ScheduledExecutorService zombieSessionsExecutorService; - @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) public void onStartUp() { log.info("Initializing Edge RPC service!"); @@ -187,9 +192,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); - this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions"); - this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); - this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS); + this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60, 60, TimeUnit.SECONDS); log.info("Edge RPC service initialized!"); } @@ -215,9 +218,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (executorService != null) { executorService.shutdownNow(); } - if(zombieSessionsExecutorService != null){ - zombieSessionsExecutorService.shutdownNow(); - } } @Override @@ -501,13 +501,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i sessionNewEvents.remove(edgeId); } finally { newEventLock.unlock(); - } - boolean destroySessionResult = destroySession(toRemove); - if(!destroySessionResult){ - log.error("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", - edge.getTenantId(), edgeId, edge.getName(), sessionId); - zombieSessions.add(toRemove); - } + }destroySession(toRemove); TenantId tenantId = toRemove.getEdge().getTenantId(); save(tenantId, edgeId, ACTIVITY_STATE, false); long lastDisconnectTs = System.currentTimeMillis(); @@ -520,19 +514,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i edgeIdServiceIdCache.evict(edgeId); } - private boolean destroySession(EdgeGrpcSession session) { + private void destroySession(EdgeGrpcSession session) { try (session) { - for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { - if (session.destroy()) { - return true; - } else { - try { - Thread.sleep(100); - } catch (InterruptedException ignored) {} - } + if (!session.destroy()) { + log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", + session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId()); + zombieSessions.add(session); } } - return false; } private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { @@ -639,7 +628,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void destroyKafkaSessionIfDisconnectedAndConsumerActive() { + private void cleanupZombieSessions() { try { List toRemove = new ArrayList<>(); for (EdgeGrpcSession session : sessions.values()) { @@ -660,33 +649,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } } + zombieSessions.removeIf(zombie -> { + if (zombie.destroy()) { + log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); + return true; + } else { + log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].", + zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); + return false; + } + }); } catch (Exception e) { log.warn("Failed to cleanup kafka sessions", e); } } - - - private void cleanupZombieSessions() { - int zombiesToProcess = zombieSessions.size(); - if (zombiesToProcess == 0) { - return; - } - log.info("Found {} zombie sessions in the queue. Starting cleanup cycle.", zombiesToProcess); - for (int i = 0; i < zombiesToProcess; i++) { - EdgeGrpcSession zombie = zombieSessions.poll(); - if (zombie == null) { - break; - } - log.warn("[{}] Attempting to clean up zombie session [{}] for edge [{}].", - zombie.getTenantId(), zombie.getSessionId(), zombie.getEdge().getId()); - if (!destroySession(zombie)) { - log.warn("[{}] Zombie session [{}] cleanup failed again. Re-queuing for next attempt.", - zombie.getTenantId(), zombie.getSessionId()); - zombieSessions.add(zombie); - } else { - log.info("[{}] Successfully cleaned up zombie session [{}].", - zombie.getTenantId(), zombie.getSessionId()); - } - } - } }