diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 9b70c9bd72..d45d872aef 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -69,7 +69,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -193,6 +195,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); + this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); log.info("Edge RPC service initialized!"); } @@ -405,8 +408,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i EdgeId edgeId = session.getEdge().getId(); TenantId tenantId = session.getEdge().getTenantId(); - destroyKafkaSessionIfDisconnectedAndConsumerActive(tenantId, edgeId, session); - cancelScheduleEdgeEventsCheck(edgeId); if (sessions.containsKey(edgeId)) { @@ -474,22 +475,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void destroyKafkaSessionIfDisconnectedAndConsumerActive(TenantId tenantId, EdgeId edgeId, EdgeGrpcSession session) { - try { - if (session instanceof KafkaEdgeGrpcSession kafkaSession) { - if (!kafkaSession.isConnected() - && kafkaSession.getConsumer() != null - && kafkaSession.getConsumer().getConsumer() != null - && !kafkaSession.getConsumer().getConsumer().isStopped()) { - sessions.remove(edgeId); - kafkaSession.destroy(); - } - } - } catch (Exception e) { - log.warn("[{}] Failed to destroy kafka session for edge [{}]", tenantId, edgeId, e); - } - } - private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) { log.trace("[{}] cancelling edge event check for edge", edgeId); if (sessionEdgeEventChecks.containsKey(edgeId)) { @@ -631,4 +616,27 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } + private void destroyKafkaSessionIfDisconnectedAndConsumerActive() { + try { + List toRemove = new ArrayList<>(); + for (EdgeGrpcSession session : sessions.values()) { + if (session instanceof KafkaEdgeGrpcSession kafkaSession && + !kafkaSession.isConnected() && + kafkaSession.getConsumer() != null && + kafkaSession.getConsumer().getConsumer() != null && + !kafkaSession.getConsumer().getConsumer().isStopped()) { + toRemove.add(kafkaSession.getEdge().getId()); + } + } + for (EdgeId edgeId : toRemove) { + log.info("[{}] Destroying session for edge because edge is not connected", edgeId); + EdgeGrpcSession removed = sessions.remove(edgeId); + if (removed instanceof KafkaEdgeGrpcSession kafkaSession) { + kafkaSession.destroy(); + } + } + } catch (Exception e) { + log.warn("Failed to cleanup kafka sessions", e); + } + } }