destroyKafkaSessionIfDisconnectedAndConsumerActive runs for all sessions - edge can be connected to different node and scheduled will not be invoked

This commit is contained in:
Volodymyr Babak 2025-06-02 11:22:39 +03:00
parent 32e9efec83
commit 9fa71bff54

View File

@ -69,7 +69,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@ -193,6 +195,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS);
log.info("Edge RPC service initialized!");
}
@ -405,8 +408,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
EdgeId edgeId = session.getEdge().getId();
TenantId tenantId = session.getEdge().getTenantId();
destroyKafkaSessionIfDisconnectedAndConsumerActive(tenantId, edgeId, session);
cancelScheduleEdgeEventsCheck(edgeId);
if (sessions.containsKey(edgeId)) {
@ -474,22 +475,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
}
private void destroyKafkaSessionIfDisconnectedAndConsumerActive(TenantId tenantId, EdgeId edgeId, EdgeGrpcSession session) {
try {
if (session instanceof KafkaEdgeGrpcSession kafkaSession) {
if (!kafkaSession.isConnected()
&& kafkaSession.getConsumer() != null
&& kafkaSession.getConsumer().getConsumer() != null
&& !kafkaSession.getConsumer().getConsumer().isStopped()) {
sessions.remove(edgeId);
kafkaSession.destroy();
}
}
} catch (Exception e) {
log.warn("[{}] Failed to destroy kafka session for edge [{}]", tenantId, edgeId, e);
}
}
private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) {
log.trace("[{}] cancelling edge event check for edge", edgeId);
if (sessionEdgeEventChecks.containsKey(edgeId)) {
@ -631,4 +616,27 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
}
private void destroyKafkaSessionIfDisconnectedAndConsumerActive() {
try {
List<EdgeId> toRemove = new ArrayList<>();
for (EdgeGrpcSession session : sessions.values()) {
if (session instanceof KafkaEdgeGrpcSession kafkaSession &&
!kafkaSession.isConnected() &&
kafkaSession.getConsumer() != null &&
kafkaSession.getConsumer().getConsumer() != null &&
!kafkaSession.getConsumer().getConsumer().isStopped()) {
toRemove.add(kafkaSession.getEdge().getId());
}
}
for (EdgeId edgeId : toRemove) {
log.info("[{}] Destroying session for edge because edge is not connected", edgeId);
EdgeGrpcSession removed = sessions.remove(edgeId);
if (removed instanceof KafkaEdgeGrpcSession kafkaSession) {
kafkaSession.destroy();
}
}
} catch (Exception e) {
log.warn("Failed to cleanup kafka sessions", e);
}
}
}