dump sessions and session close notification logic refactoring
This commit is contained in:
		
							parent
							
								
									aae9ac9eda
								
							
						
					
					
						commit
						a071ef31e9
					
				@ -709,10 +709,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
                    if (closeTransportSessionOnRpcDeliveryTimeout) {
 | 
			
		||||
                        md.setRetries(0);
 | 
			
		||||
                        status = RpcStatus.QUEUED;
 | 
			
		||||
                        sessions.forEach(this::notifyTransportAboutClosedSessionRpcDeliveryTimeout);
 | 
			
		||||
                        attributeSubscriptions.clear();
 | 
			
		||||
                        rpcSubscriptions.clear();
 | 
			
		||||
                        dumpSessions();
 | 
			
		||||
                        notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason.RPC_DELIVERY_TIMEOUT);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        toDeviceRpcPendingMap.remove(requestId);
 | 
			
		||||
                        status = RpcStatus.FAILED;
 | 
			
		||||
@ -853,33 +850,30 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
 | 
			
		||||
    void processCredentialsUpdate(TbActorMsg msg) {
 | 
			
		||||
        if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
 | 
			
		||||
            sessions.forEach((k, v) -> {
 | 
			
		||||
                notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials());
 | 
			
		||||
            });
 | 
			
		||||
            sessions.forEach((k, v) ->
 | 
			
		||||
                    notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials()));
 | 
			
		||||
        } else {
 | 
			
		||||
            sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!", SessionCloseReason.CREDENTIALS_UPDATED));
 | 
			
		||||
            attributeSubscriptions.clear();
 | 
			
		||||
            rpcSubscriptions.clear();
 | 
			
		||||
            dumpSessions();
 | 
			
		||||
 | 
			
		||||
            notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason.CREDENTIALS_UPDATED);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void notifyTransportAboutClosedSessionRpcDeliveryTimeout(UUID sessionId, SessionInfoMetaData sessionMd) {
 | 
			
		||||
        log.debug("Close session due to RPC delivery failure. sessionId: [{}] sessionMd: [{}]", sessionId, sessionMd);
 | 
			
		||||
        notifyTransportAboutClosedSession(sessionId, sessionMd, "RPC delivery failed!", SessionCloseReason.RPC_DELIVERY_TIMEOUT);
 | 
			
		||||
    private void notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason transportSessionCloseReason) {
 | 
			
		||||
        sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, transportSessionCloseReason));
 | 
			
		||||
        attributeSubscriptions.clear();
 | 
			
		||||
        rpcSubscriptions.clear();
 | 
			
		||||
        dumpSessions();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) {
 | 
			
		||||
        log.debug("Remove eldest session (max concurrent sessions limit reached per device) sessionId: [{}] sessionMd: [{}]", sessionId, sessionMd);
 | 
			
		||||
        notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!", SessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED);
 | 
			
		||||
        notifyTransportAboutClosedSession(sessionId, sessionMd, TransportSessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message, SessionCloseReason reason) {
 | 
			
		||||
    private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, TransportSessionCloseReason transportSessionCloseReason) {
 | 
			
		||||
        log.debug("{} sessionId: [{}] sessionMd: [{}]", transportSessionCloseReason.getLogMessage(), sessionId, sessionMd);
 | 
			
		||||
        SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto
 | 
			
		||||
                .newBuilder()
 | 
			
		||||
                .setMessage(message)
 | 
			
		||||
                .setReason(reason)
 | 
			
		||||
                .setMessage(transportSessionCloseReason.getNotificationMessage())
 | 
			
		||||
                .setReason(SessionCloseReason.forNumber(transportSessionCloseReason.getProtoNumber()))
 | 
			
		||||
                .build();
 | 
			
		||||
        ToTransportMsg msg = ToTransportMsg.newBuilder()
 | 
			
		||||
                .setSessionIdMSB(sessionId.getMostSignificantBits())
 | 
			
		||||
@ -1063,7 +1057,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
                attributeSubscriptions.remove(id);
 | 
			
		||||
                if (session != null) {
 | 
			
		||||
                    removed++;
 | 
			
		||||
                    notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE, SessionCloseReason.SESSION_TIMEOUT);
 | 
			
		||||
                    notifyTransportAboutClosedSession(id, session, TransportSessionCloseReason.SESSION_TIMEOUT);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (removed != 0) {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,39 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.actors.device;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
 | 
			
		||||
@Getter
 | 
			
		||||
public enum TransportSessionCloseReason {
 | 
			
		||||
 | 
			
		||||
    UNKNOWN_REASON(0, "Unknown Reason.", "Session closed with unknown reason."),
 | 
			
		||||
    CREDENTIALS_UPDATED(1, "device credentials updated!", "Close session due to device credentials update."),
 | 
			
		||||
    MAX_CONCURRENT_SESSIONS_LIMIT_REACHED(2, "max concurrent sessions limit reached per device!", "Remove eldest session (max concurrent sessions limit reached per device)."),
 | 
			
		||||
    SESSION_TIMEOUT(3, "session timeout!", "Close session due to session timeout."),
 | 
			
		||||
    RPC_DELIVERY_TIMEOUT(4, "RPC delivery failed!", "Close session due to RPC delivery failure.");
 | 
			
		||||
 | 
			
		||||
    private final int protoNumber;
 | 
			
		||||
    private final String notificationMessage;
 | 
			
		||||
    private final String logMessage;
 | 
			
		||||
 | 
			
		||||
    TransportSessionCloseReason(int protoNumber, String notificationMessage, String logMessage) {
 | 
			
		||||
        this.protoNumber = protoNumber;
 | 
			
		||||
        this.notificationMessage = notificationMessage;
 | 
			
		||||
        this.logMessage = logMessage;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user