device actor checkSessionsTimeout refactored to improve performance and reduce memory pressure

This commit is contained in:
Sergey Matvienko 2022-01-10 12:58:32 +02:00
parent 3df6155adf
commit d3987d1c67

View File

@ -97,6 +97,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@ -116,6 +117,7 @@ import java.util.stream.Collectors;
@Slf4j
class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
static final String SESSION_TIMEOUT_MESSAGE = "session timeout!";
final TenantId tenantId;
final DeviceId deviceId;
final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
@ -961,19 +963,42 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
void checkSessionsTimeout() {
log.debug("[{}] checkSessionsTimeout started. Size before check {}", deviceId, sessions.size());
long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
Map<UUID, SessionInfoMetaData> sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
sessionsToRemove.forEach((sessionId, sessionMD) -> {
sessions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
notifyTransportAboutClosedSession(sessionId, sessionMD, "session timeout!");
});
if (!sessionsToRemove.isEmpty()) {
dumpSessions();
final long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
List<UUID> expiredIds = null;
try {
for (Map.Entry<UUID, SessionInfoMetaData> kv : sessions.entrySet()) { //entry set are cached for stable sessions
if (kv.getValue().getLastActivityTime() < expTime) {
final UUID id = kv.getKey();
if (expiredIds == null) {
expiredIds = new ArrayList<>(1); //most of the expired sessions is a single event
}
expiredIds.add(id);
}
}
} catch (ConcurrentModificationException ignored) {
//Sessions are not thread safe and possible exceptions
//It is an extremely rare event
//Complete session check will perform on the next check
}
log.debug("[{}] checkSessionsTimeout finished. Size after check {}", deviceId, sessions.size());
if (expiredIds != null) {
int removed = 0;
for (UUID id : expiredIds) {
final SessionInfoMetaData session = sessions.remove(id);
rpcSubscriptions.remove(id);
attributeSubscriptions.remove(id);
if (session != null) {
removed++;
notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE);
}
}
if (removed != 0) {
dumpSessions();
}
}
}
}