Merge pull request #8727 from volodymyr-babak/edge-connection-fix
[Hotfix] Check edge session before removal to avoid removing live session
This commit is contained in:
commit
db5c3e5c16
@ -380,21 +380,26 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onEdgeDisconnect(EdgeId edgeId) {
|
private void onEdgeDisconnect(EdgeId edgeId, UUID sessionId) {
|
||||||
log.info("[{}] edge disconnected!", edgeId);
|
log.info("[{}][{}] edge disconnected!", edgeId, sessionId);
|
||||||
EdgeGrpcSession removed = sessions.remove(edgeId);
|
EdgeGrpcSession toRemove = sessions.get(edgeId);
|
||||||
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
|
if (toRemove.getSessionId().equals(sessionId)) {
|
||||||
newEventLock.lock();
|
toRemove = sessions.remove(edgeId);
|
||||||
try {
|
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
|
||||||
sessionNewEvents.remove(edgeId);
|
newEventLock.lock();
|
||||||
} finally {
|
try {
|
||||||
newEventLock.unlock();
|
sessionNewEvents.remove(edgeId);
|
||||||
|
} finally {
|
||||||
|
newEventLock.unlock();
|
||||||
|
}
|
||||||
|
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
|
||||||
|
long lastDisconnectTs = System.currentTimeMillis();
|
||||||
|
save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs);
|
||||||
|
pushRuleEngineMessage(toRemove.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT);
|
||||||
|
cancelScheduleEdgeEventsCheck(edgeId);
|
||||||
|
} else {
|
||||||
|
log.debug("[{}] edge session [{}] is not available anymore, nothing to remove. most probably this session is already outdated!", edgeId, sessionId);
|
||||||
}
|
}
|
||||||
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
|
|
||||||
long lastDisconnectTs = System.currentTimeMillis();
|
|
||||||
save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs);
|
|
||||||
pushRuleEngineMessage(removed.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT);
|
|
||||||
cancelScheduleEdgeEventsCheck(edgeId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void save(EdgeId edgeId, String key, long value) {
|
private void save(EdgeId edgeId, String key, long value) {
|
||||||
|
|||||||
@ -92,7 +92,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
|
|
||||||
private final UUID sessionId;
|
private final UUID sessionId;
|
||||||
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
|
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
|
||||||
private final Consumer<EdgeId> sessionCloseListener;
|
private final BiConsumer<EdgeId, UUID> sessionCloseListener;
|
||||||
|
|
||||||
private final EdgeSessionState sessionState = new EdgeSessionState();
|
private final EdgeSessionState sessionState = new EdgeSessionState();
|
||||||
|
|
||||||
@ -111,7 +111,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
private ScheduledExecutorService sendDownlinkExecutorService;
|
private ScheduledExecutorService sendDownlinkExecutorService;
|
||||||
|
|
||||||
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
|
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
|
||||||
Consumer<EdgeId> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize) {
|
BiConsumer<EdgeId, UUID> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize) {
|
||||||
this.sessionId = UUID.randomUUID();
|
this.sessionId = UUID.randomUUID();
|
||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
this.outputStream = outputStream;
|
this.outputStream = outputStream;
|
||||||
@ -180,7 +180,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
connected = false;
|
connected = false;
|
||||||
if (edge != null) {
|
if (edge != null) {
|
||||||
try {
|
try {
|
||||||
sessionCloseListener.accept(edge.getId());
|
sessionCloseListener.accept(edge.getId(), sessionId);
|
||||||
} catch (Exception ignored) {
|
} catch (Exception ignored) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -288,7 +288,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] Failed to send downlink message [{}]", this.sessionId, downlinkMsg, e);
|
log.error("[{}] Failed to send downlink message [{}]", this.sessionId, downlinkMsg, e);
|
||||||
connected = false;
|
connected = false;
|
||||||
sessionCloseListener.accept(edge.getId());
|
sessionCloseListener.accept(edge.getId(), sessionId);
|
||||||
} finally {
|
} finally {
|
||||||
downlinkMsgLock.unlock();
|
downlinkMsgLock.unlock();
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user