Check edge session before removal to avoid removing live session

This commit is contained in:
Volodymyr Babak 2023-06-08 15:52:40 +03:00
parent 2afc478918
commit 080a409839
2 changed files with 23 additions and 18 deletions

View File

@ -380,9 +380,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} }
} }
private void onEdgeDisconnect(EdgeId edgeId) { private void onEdgeDisconnect(EdgeId edgeId, UUID sessionId) {
log.info("[{}] edge disconnected!", edgeId); log.info("[{}][{}] edge disconnected!", edgeId, sessionId);
EdgeGrpcSession removed = sessions.remove(edgeId); EdgeGrpcSession toRemove = sessions.get(edgeId);
if (toRemove.getSessionId().equals(sessionId)) {
toRemove = sessions.remove(edgeId);
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock(); newEventLock.lock();
try { try {
@ -393,8 +395,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false); save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
long lastDisconnectTs = System.currentTimeMillis(); long lastDisconnectTs = System.currentTimeMillis();
save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs); save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs);
pushRuleEngineMessage(removed.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT); pushRuleEngineMessage(toRemove.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT);
cancelScheduleEdgeEventsCheck(edgeId); cancelScheduleEdgeEventsCheck(edgeId);
} else {
log.debug("[{}] edge session [{}] is not available anymore, nothing to remove. most probably this session is already outdated!", edgeId, sessionId);
}
} }
private void save(EdgeId edgeId, String key, long value) { private void save(EdgeId edgeId, String key, long value) {

View File

@ -92,7 +92,7 @@ public final class EdgeGrpcSession implements Closeable {
private final UUID sessionId; private final UUID sessionId;
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener; private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
private final Consumer<EdgeId> sessionCloseListener; private final BiConsumer<EdgeId, UUID> sessionCloseListener;
private final EdgeSessionState sessionState = new EdgeSessionState(); private final EdgeSessionState sessionState = new EdgeSessionState();
@ -111,7 +111,7 @@ public final class EdgeGrpcSession implements Closeable {
private ScheduledExecutorService sendDownlinkExecutorService; private ScheduledExecutorService sendDownlinkExecutorService;
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
Consumer<EdgeId> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize) { BiConsumer<EdgeId, UUID> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize) {
this.sessionId = UUID.randomUUID(); this.sessionId = UUID.randomUUID();
this.ctx = ctx; this.ctx = ctx;
this.outputStream = outputStream; this.outputStream = outputStream;
@ -180,7 +180,7 @@ public final class EdgeGrpcSession implements Closeable {
connected = false; connected = false;
if (edge != null) { if (edge != null) {
try { try {
sessionCloseListener.accept(edge.getId()); sessionCloseListener.accept(edge.getId(), sessionId);
} catch (Exception ignored) { } catch (Exception ignored) {
} }
} }
@ -288,7 +288,7 @@ public final class EdgeGrpcSession implements Closeable {
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to send downlink message [{}]", this.sessionId, downlinkMsg, e); log.error("[{}] Failed to send downlink message [{}]", this.sessionId, downlinkMsg, e);
connected = false; connected = false;
sessionCloseListener.accept(edge.getId()); sessionCloseListener.accept(edge.getId(), sessionId);
} finally { } finally {
downlinkMsgLock.unlock(); downlinkMsgLock.unlock();
} }