diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 5171a6037f..92a7c27867 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -380,21 +380,26 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private void onEdgeDisconnect(EdgeId edgeId) { - log.info("[{}] edge disconnected!", edgeId); - EdgeGrpcSession removed = sessions.remove(edgeId); - final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); - newEventLock.lock(); - try { - sessionNewEvents.remove(edgeId); - } finally { - newEventLock.unlock(); + private void onEdgeDisconnect(EdgeId edgeId, UUID sessionId) { + log.info("[{}][{}] edge disconnected!", edgeId, sessionId); + EdgeGrpcSession toRemove = sessions.get(edgeId); + if (toRemove.getSessionId().equals(sessionId)) { + toRemove = sessions.remove(edgeId); + final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); + newEventLock.lock(); + try { + sessionNewEvents.remove(edgeId); + } finally { + newEventLock.unlock(); + } + save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false); + long lastDisconnectTs = System.currentTimeMillis(); + save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs); + pushRuleEngineMessage(toRemove.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT); + cancelScheduleEdgeEventsCheck(edgeId); + } else { + log.debug("[{}] edge session [{}] is not available anymore, nothing to remove. most probably this session is already outdated!", edgeId, sessionId); } - save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false); - long lastDisconnectTs = System.currentTimeMillis(); - save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs); - pushRuleEngineMessage(removed.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT); - cancelScheduleEdgeEventsCheck(edgeId); } private void save(EdgeId edgeId, String key, long value) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 33aed92682..1dd0f31c20 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -92,7 +92,7 @@ public final class EdgeGrpcSession implements Closeable { private final UUID sessionId; private final BiConsumer sessionOpenListener; - private final Consumer sessionCloseListener; + private final BiConsumer sessionCloseListener; private final EdgeSessionState sessionState = new EdgeSessionState(); @@ -111,7 +111,7 @@ public final class EdgeGrpcSession implements Closeable { private ScheduledExecutorService sendDownlinkExecutorService; EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, - Consumer sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize) { + BiConsumer sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize) { this.sessionId = UUID.randomUUID(); this.ctx = ctx; this.outputStream = outputStream; @@ -180,7 +180,7 @@ public final class EdgeGrpcSession implements Closeable { connected = false; if (edge != null) { try { - sessionCloseListener.accept(edge.getId()); + sessionCloseListener.accept(edge.getId(), sessionId); } catch (Exception ignored) { } } @@ -288,7 +288,7 @@ public final class EdgeGrpcSession implements Closeable { } catch (Exception e) { log.error("[{}] Failed to send downlink message [{}]", this.sessionId, downlinkMsg, e); connected = false; - sessionCloseListener.accept(edge.getId()); + sessionCloseListener.accept(edge.getId(), sessionId); } finally { downlinkMsgLock.unlock(); }