diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 369d371286..d8f0c38975 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -97,6 +97,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -116,6 +117,7 @@ import java.util.stream.Collectors; @Slf4j class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { + static final String SESSION_TIMEOUT_MESSAGE = "session timeout!"; final TenantId tenantId; final DeviceId deviceId; final LinkedHashMapRemoveEldest sessions; @@ -961,19 +963,42 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } void checkSessionsTimeout() { - log.debug("[{}] checkSessionsTimeout started. Size before check {}", deviceId, sessions.size()); - long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout(); - Map sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - sessionsToRemove.forEach((sessionId, sessionMD) -> { - sessions.remove(sessionId); - rpcSubscriptions.remove(sessionId); - attributeSubscriptions.remove(sessionId); - notifyTransportAboutClosedSession(sessionId, sessionMD, "session timeout!"); - }); - if (!sessionsToRemove.isEmpty()) { - dumpSessions(); + final long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout(); + List expiredIds = null; + + try { + for (Map.Entry kv : sessions.entrySet()) { //entry set are cached for stable sessions + if (kv.getValue().getLastActivityTime() < expTime) { + final UUID id = kv.getKey(); + if (expiredIds == null) { + expiredIds = new ArrayList<>(1); //most of the expired sessions is a single event + } + expiredIds.add(id); + + } + } + } catch (ConcurrentModificationException ignored) { + //Sessions are not thread safe and possible exceptions + //It is an extremely rare event + //Complete session check will perform on the next check } - log.debug("[{}] checkSessionsTimeout finished. Size after check {}", deviceId, sessions.size()); + + if (expiredIds != null) { + int removed = 0; + for (UUID id : expiredIds) { + final SessionInfoMetaData session = sessions.remove(id); + rpcSubscriptions.remove(id); + attributeSubscriptions.remove(id); + if (session != null) { + removed++; + notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE); + } + } + if (removed != 0) { + dumpSessions(); + } + } + } }