Max count of sessions per device
This commit is contained in:
		
							parent
							
								
									4007196d32
								
							
						
					
					
						commit
						ea0585f3d9
					
				@ -203,6 +203,10 @@ public class ActorSystemContext {
 | 
			
		||||
    @Getter
 | 
			
		||||
    private long queuePartitionId;
 | 
			
		||||
 | 
			
		||||
    @Value("${actors.session.max_concurrent_sessions_per_device:1}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private long maxConcurrentSessionsPerDevice;
 | 
			
		||||
 | 
			
		||||
    @Value("${actors.session.sync.timeout}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private long syncSessionTimeout;
 | 
			
		||||
 | 
			
		||||
@ -81,6 +81,7 @@ import java.util.Arrays;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.LinkedHashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
@ -117,7 +118,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
        super(systemContext, logger);
 | 
			
		||||
        this.tenantId = tenantId;
 | 
			
		||||
        this.deviceId = deviceId;
 | 
			
		||||
        this.sessions = new HashMap<>();
 | 
			
		||||
        this.sessions = new LinkedHashMap<>();
 | 
			
		||||
        this.attributeSubscriptions = new HashMap<>();
 | 
			
		||||
        this.rpcSubscriptions = new HashMap<>();
 | 
			
		||||
        this.toDeviceRpcPendingMap = new HashMap<>();
 | 
			
		||||
@ -501,6 +502,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
        FromDeviceMsg inMsg = msg.getPayload();
 | 
			
		||||
        if (inMsg instanceof SessionOpenMsg) {
 | 
			
		||||
            logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
 | 
			
		||||
            if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
 | 
			
		||||
                SessionId sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
 | 
			
		||||
                if (sessionIdToRemove != null) {
 | 
			
		||||
                    closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
 | 
			
		||||
            if (sessions.size() == 1) {
 | 
			
		||||
                reportSessionOpen();
 | 
			
		||||
@ -528,13 +535,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void processCredentialsUpdate() {
 | 
			
		||||
        sessions.forEach((k, v) -> {
 | 
			
		||||
            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
 | 
			
		||||
        });
 | 
			
		||||
        sessions.forEach(this::closeSession);
 | 
			
		||||
        attributeSubscriptions.clear();
 | 
			
		||||
        rpcSubscriptions.clear();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void closeSession(SessionId sessionId, SessionInfo sessionInfo) {
 | 
			
		||||
        sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), sessionId), sessionInfo.getServer());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
 | 
			
		||||
        this.deviceName = msg.getDeviceName();
 | 
			
		||||
        this.deviceType = msg.getDeviceType();
 | 
			
		||||
 | 
			
		||||
@ -56,12 +56,14 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
 | 
			
		||||
        updateSessionCtx(msg, SessionType.ASYNC);
 | 
			
		||||
        if (firstMsg) {
 | 
			
		||||
            toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
 | 
			
		||||
            firstMsg = false;
 | 
			
		||||
        }
 | 
			
		||||
        DeviceToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
 | 
			
		||||
        FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
 | 
			
		||||
        if (firstMsg) {
 | 
			
		||||
            if (fromDeviceMsg.getMsgType() != SessionMsgType.SESSION_OPEN) {
 | 
			
		||||
                toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
 | 
			
		||||
            }
 | 
			
		||||
            firstMsg = false;
 | 
			
		||||
        }
 | 
			
		||||
        switch (fromDeviceMsg.getMsgType()) {
 | 
			
		||||
            case POST_TELEMETRY_REQUEST:
 | 
			
		||||
            case POST_ATTRIBUTES_REQUEST:
 | 
			
		||||
 | 
			
		||||
@ -97,7 +97,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
 | 
			
		||||
        log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
 | 
			
		||||
        log.error("[{}] response to request: [{}]", this.hashCode(), response.getId());
 | 
			
		||||
        if (routingService.getCurrentServer().equals(response.getServerAddress())) {
 | 
			
		||||
            UUID requestId = response.getId();
 | 
			
		||||
            Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
 | 
			
		||||
 | 
			
		||||
@ -227,6 +227,7 @@ actors:
 | 
			
		||||
  tenant:
 | 
			
		||||
    create_components_on_init: true
 | 
			
		||||
  session:
 | 
			
		||||
    max_concurrent_sessions_per_device: "${ACTORS_MAX_CONCURRENT_SESSION_PER_DEVICE:1}"
 | 
			
		||||
    sync:
 | 
			
		||||
      # Default timeout for processing request using synchronous session (HTTP, CoAP) in milliseconds
 | 
			
		||||
      timeout: "${ACTORS_SESSION_SYNC_TIMEOUT:10000}"
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,9 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.msg.core;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.data.id.SessionId;
 | 
			
		||||
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
@ -22,7 +25,9 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
/**
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
public class SessionOpenMsg implements FromDeviceMsg {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public SessionMsgType getMsgType() {
 | 
			
		||||
        return SessionMsgType.SESSION_OPEN;
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,9 @@ import org.springframework.util.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
 | 
			
		||||
import org.thingsboard.server.common.data.security.DeviceX509Credentials;
 | 
			
		||||
import org.thingsboard.server.common.msg.core.SessionOpenMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 | 
			
		||||
import org.thingsboard.server.common.transport.SessionMsgProcessor;
 | 
			
		||||
@ -95,6 +97,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
        log.trace("[{}] Processing msg: {}", sessionId, msg);
 | 
			
		||||
        if (msg instanceof MqttMessage) {
 | 
			
		||||
            processMqttMsg(ctx, (MqttMessage) msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            ctx.close();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -303,6 +307,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
        } else {
 | 
			
		||||
            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
 | 
			
		||||
            connected = true;
 | 
			
		||||
            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
 | 
			
		||||
                    new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
 | 
			
		||||
            checkGatewaySession();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -314,6 +320,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 | 
			
		||||
            if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
 | 
			
		||||
                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
 | 
			
		||||
                connected = true;
 | 
			
		||||
                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
 | 
			
		||||
                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
 | 
			
		||||
                checkGatewaySession();
 | 
			
		||||
            } else {
 | 
			
		||||
                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user