Refactoring and simplified

This commit is contained in:
Volodymyr Babak 2025-09-25 22:00:35 +03:00
parent f2c557b8d6
commit 9384a4b530

View File

@ -68,8 +68,17 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.*; import java.util.ArrayList;
import java.util.concurrent.*; import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -84,15 +93,13 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS
@TbCoreComponent @TbCoreComponent
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10;
private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>();
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>(); private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>(); private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>();
private final Queue<EdgeGrpcSession> zombieSessions = new ConcurrentLinkedQueue<>(); private final List<EdgeGrpcSession> zombieSessions = new ArrayList<>();
@Value("${edges.rpc.port}") @Value("${edges.rpc.port}")
private int rpcPort; private int rpcPort;
@ -154,8 +161,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private ScheduledExecutorService executorService; private ScheduledExecutorService executorService;
private ScheduledExecutorService zombieSessionsExecutorService;
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void onStartUp() { public void onStartUp() {
log.info("Initializing Edge RPC service!"); log.info("Initializing Edge RPC service!");
@ -187,9 +192,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions"); this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60, 60, TimeUnit.SECONDS);
this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS);
this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS);
log.info("Edge RPC service initialized!"); log.info("Edge RPC service initialized!");
} }
@ -215,9 +218,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (executorService != null) { if (executorService != null) {
executorService.shutdownNow(); executorService.shutdownNow();
} }
if(zombieSessionsExecutorService != null){
zombieSessionsExecutorService.shutdownNow();
}
} }
@Override @Override
@ -501,13 +501,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
sessionNewEvents.remove(edgeId); sessionNewEvents.remove(edgeId);
} finally { } finally {
newEventLock.unlock(); newEventLock.unlock();
} }destroySession(toRemove);
boolean destroySessionResult = destroySession(toRemove);
if(!destroySessionResult){
log.error("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.",
edge.getTenantId(), edgeId, edge.getName(), sessionId);
zombieSessions.add(toRemove);
}
TenantId tenantId = toRemove.getEdge().getTenantId(); TenantId tenantId = toRemove.getEdge().getTenantId();
save(tenantId, edgeId, ACTIVITY_STATE, false); save(tenantId, edgeId, ACTIVITY_STATE, false);
long lastDisconnectTs = System.currentTimeMillis(); long lastDisconnectTs = System.currentTimeMillis();
@ -520,19 +514,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
edgeIdServiceIdCache.evict(edgeId); edgeIdServiceIdCache.evict(edgeId);
} }
private boolean destroySession(EdgeGrpcSession session) { private void destroySession(EdgeGrpcSession session) {
try (session) { try (session) {
for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { if (!session.destroy()) {
if (session.destroy()) { log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.",
return true; session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId());
} else { zombieSessions.add(session);
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {}
}
} }
} }
return false;
} }
private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { private void save(TenantId tenantId, EdgeId edgeId, String key, long value) {
@ -639,7 +628,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} }
} }
private void destroyKafkaSessionIfDisconnectedAndConsumerActive() { private void cleanupZombieSessions() {
try { try {
List<EdgeId> toRemove = new ArrayList<>(); List<EdgeId> toRemove = new ArrayList<>();
for (EdgeGrpcSession session : sessions.values()) { for (EdgeGrpcSession session : sessions.values()) {
@ -660,33 +649,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} }
} }
} }
zombieSessions.removeIf(zombie -> {
if (zombie.destroy()) {
log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].",
zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName());
return true;
} else {
log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].",
zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName());
return false;
}
});
} catch (Exception e) { } catch (Exception e) {
log.warn("Failed to cleanup kafka sessions", e); log.warn("Failed to cleanup kafka sessions", e);
} }
} }
private void cleanupZombieSessions() {
int zombiesToProcess = zombieSessions.size();
if (zombiesToProcess == 0) {
return;
}
log.info("Found {} zombie sessions in the queue. Starting cleanup cycle.", zombiesToProcess);
for (int i = 0; i < zombiesToProcess; i++) {
EdgeGrpcSession zombie = zombieSessions.poll();
if (zombie == null) {
break;
}
log.warn("[{}] Attempting to clean up zombie session [{}] for edge [{}].",
zombie.getTenantId(), zombie.getSessionId(), zombie.getEdge().getId());
if (!destroySession(zombie)) {
log.warn("[{}] Zombie session [{}] cleanup failed again. Re-queuing for next attempt.",
zombie.getTenantId(), zombie.getSessionId());
zombieSessions.add(zombie);
} else {
log.info("[{}] Successfully cleaned up zombie session [{}].",
zombie.getTenantId(), zombie.getSessionId());
}
}
}
} }