From ea0585f3d9534b2a0072b154d0d3feffacae5f3d Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Sun, 27 May 2018 22:22:53 +0300 Subject: [PATCH] Max count of sessions per device --- .../server/actors/ActorSystemContext.java | 4 ++++ .../device/DeviceActorMessageProcessor.java | 17 +++++++++++++---- .../actors/session/ASyncMsgProcessor.java | 10 ++++++---- .../service/rpc/DefaultDeviceRpcService.java | 2 +- application/src/main/resources/thingsboard.yml | 1 + .../server/common/msg/core/SessionOpenMsg.java | 5 +++++ .../transport/mqtt/MqttTransportHandler.java | 8 ++++++++ 7 files changed, 38 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 94581b91a7..d33734e14c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -203,6 +203,10 @@ public class ActorSystemContext { @Getter private long queuePartitionId; + @Value("${actors.session.max_concurrent_sessions_per_device:1}") + @Getter + private long maxConcurrentSessionsPerDevice; + @Value("${actors.session.sync.timeout}") @Getter private long syncSessionTimeout; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index d4164979a9..85b32857f1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -81,6 +81,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -117,7 +118,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso super(systemContext, logger); this.tenantId = tenantId; this.deviceId = deviceId; - this.sessions = new HashMap<>(); + this.sessions = new LinkedHashMap<>(); this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); this.toDeviceRpcPendingMap = new HashMap<>(); @@ -501,6 +502,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso FromDeviceMsg inMsg = msg.getPayload(); if (inMsg instanceof SessionOpenMsg) { logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); + if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { + SessionId sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); + if (sessionIdToRemove != null) { + closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove)); + } + } sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress())); if (sessions.size() == 1) { reportSessionOpen(); @@ -528,13 +535,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } void processCredentialsUpdate() { - sessions.forEach((k, v) -> { - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer()); - }); + sessions.forEach(this::closeSession); attributeSubscriptions.clear(); rpcSubscriptions.clear(); } + private void closeSession(SessionId sessionId, SessionInfo sessionInfo) { + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), sessionId), sessionInfo.getServer()); + } + void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) { this.deviceName = msg.getDeviceName(); this.deviceType = msg.getDeviceType(); diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java index 70534ae8a8..a8f14fedf5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java @@ -56,12 +56,14 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { @Override protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) { updateSessionCtx(msg, SessionType.ASYNC); - if (firstMsg) { - toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m)); - firstMsg = false; - } DeviceToDeviceActorMsg pendingMsg = toDeviceMsg(msg); FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload(); + if (firstMsg) { + if (fromDeviceMsg.getMsgType() != SessionMsgType.SESSION_OPEN) { + toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m)); + } + firstMsg = false; + } switch (fromDeviceMsg.getMsgType()) { case POST_TELEMETRY_REQUEST: case POST_ATTRIBUTES_REQUEST: diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java index 6630d7cede..678495bf46 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java @@ -97,7 +97,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @Override public void processRpcResponseFromDevice(FromDeviceRpcResponse response) { - log.error("[{}] response the request: [{}]", this.hashCode(), response.getId()); + log.error("[{}] response to request: [{}]", this.hashCode(), response.getId()); if (routingService.getCurrentServer().equals(response.getServerAddress())) { UUID requestId = response.getId(); Consumer consumer = localRpcRequests.remove(requestId); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 188291a4e2..5653193d27 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -227,6 +227,7 @@ actors: tenant: create_components_on_init: true session: + max_concurrent_sessions_per_device: "${ACTORS_MAX_CONCURRENT_SESSION_PER_DEVICE:1}" sync: # Default timeout for processing request using synchronous session (HTTP, CoAP) in milliseconds timeout: "${ACTORS_SESSION_SYNC_TIMEOUT:10000}" diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java index c121a71584..28e319e525 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.msg.core; +import lombok.Data; +import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.msg.aware.SessionAwareMsg; import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -22,7 +25,9 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; /** * @author Andrew Shvayka */ +@Data public class SessionOpenMsg implements FromDeviceMsg { + @Override public SessionMsgType getMsgType() { return SessionMsgType.SESSION_OPEN; diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 0b3881723d..185b7a82c3 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -29,7 +29,9 @@ import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.data.security.DeviceX509Credentials; +import org.thingsboard.server.common.msg.core.SessionOpenMsg; import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; +import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; @@ -95,6 +97,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.trace("[{}] Processing msg: {}", sessionId, msg); if (msg instanceof MqttMessage) { processMqttMsg(ctx, (MqttMessage) msg); + } else { + ctx.close(); } } @@ -303,6 +307,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); connected = true; + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg()))); checkGatewaySession(); } } @@ -314,6 +320,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); connected = true; + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg()))); checkGatewaySession(); } else { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));