static transport immutable proto SESSION_EVENT_MSG_OPEN, SESSION_EVENT_MSG_CLOSED, SESSION_CLOSE_NOTIFICATION_PROTO, SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG, SUBSCRIBE_TO_RPC_ASYNC_MSG

This commit is contained in:
Sergey Matvienko 2022-01-11 12:23:48 +02:00
parent a11146b445
commit b0a79e9fc3
4 changed files with 34 additions and 26 deletions

View File

@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.server.LwM2mSessionMsgListener; import org.thingsboard.server.transport.lwm2m.server.LwM2mSessionMsgListener;
@ -27,6 +26,11 @@ import org.thingsboard.server.transport.lwm2m.server.attributes.LwM2MAttributesS
import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler; import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler;
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
@Slf4j @Slf4j
@Service @Service
@TbLwM2mTransportComponent @TbLwM2mTransportComponent
@ -52,16 +56,16 @@ public class DefaultLwM2MSessionManager implements LwM2MSessionManager {
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(uplinkHandler, attributesService, rpcHandler, sessionInfo, transportService)); transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(uplinkHandler, attributesService, rpcHandler, sessionInfo, transportService));
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder() TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(sessionInfo) .setSessionInfo(sessionInfo)
.setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN)) .setSessionEvent(SESSION_EVENT_MSG_OPEN)
.setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build()) .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build()) .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(); .build();
transportService.process(msg, null); transportService.process(msg, null);
} }
@Override @Override
public void deregister(TransportProtos.SessionInfoProto sessionInfo) { public void deregister(TransportProtos.SessionInfoProto sessionInfo) {
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
transportService.deregisterSession(sessionInfo); transportService.deregisterSession(sessionInfo);
} }
} }

View File

@ -65,7 +65,6 @@ import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.util.SslUtil; import org.thingsboard.server.common.transport.util.SslUtil;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@ -102,6 +101,8 @@ import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
@ -929,7 +930,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void doDisconnect() { public void doDisconnect() {
if (deviceSessionCtx.isConnected()) { if (deviceSessionCtx.isConnected()) {
log.debug("[{}] Client disconnected!", sessionId); log.debug("[{}] Client disconnected!", sessionId);
transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
if (gatewaySessionHandler != null) { if (gatewaySessionHandler != null) {
gatewaySessionHandler.onGatewayDisconnect(); gatewaySessionHandler.onGatewayDisconnect();
@ -948,7 +949,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
deviceSessionCtx.setDeviceProfile(msg.getDeviceProfile()); deviceSessionCtx.setDeviceProfile(msg.getDeviceProfile());
deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId)); deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));
transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
@Override @Override
public void onSuccess(Void msg) { public void onSuccess(Void msg) {
SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);

View File

@ -44,7 +44,6 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@ -68,6 +67,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType; import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
/** /**
* Created by ashvayka on 19.01.17. * Created by ashvayka on 19.01.17.
@ -267,11 +270,9 @@ public class GatewaySessionHandler {
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionInfo) .setSessionInfo(deviceSessionInfo)
.setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN)) .setSessionEvent(SESSION_EVENT_MSG_OPEN)
.setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder() .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSessionType(TransportProtos.SessionType.ASYNC).build()) .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder()
.setSessionType(TransportProtos.SessionType.ASYNC).build())
.build(), null); .build(), null);
} }
futureToSet.set(devices.get(deviceName)); futureToSet.set(devices.get(deviceName));
@ -720,7 +721,7 @@ public class GatewaySessionHandler {
private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) { private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) {
transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
} }

View File

@ -21,7 +21,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
@ -124,6 +123,15 @@ import java.util.stream.Collectors;
public class DefaultTransportService implements TransportService { public class DefaultTransportService implements TransportService {
public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime"; public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime";
public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!";
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN);
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED);
public static final TransportProtos.SessionCloseNotificationProto SESSION_CLOSE_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder()
.setMessage(SESSION_EXPIRED_MESSAGE).build();
public static final TransportProtos.SubscribeToAttributeUpdatesMsg SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG = TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder()
.setSessionType(TransportProtos.SessionType.ASYNC).build();
public static final TransportProtos.SubscribeToRPCMsg SUBSCRIBE_TO_RPC_ASYNC_MSG = TransportProtos.SubscribeToRPCMsg.newBuilder()
.setSessionType(TransportProtos.SessionType.ASYNC).build();
private final AtomicInteger atomicTs = new AtomicInteger(0); private final AtomicInteger atomicTs = new AtomicInteger(0);
@ -267,9 +275,7 @@ public class DefaultTransportService implements TransportService {
@Override @Override
public SessionMetaData registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { public SessionMetaData registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
SessionMetaData newValue = new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener); return sessions.computeIfAbsent(toSessionId(sessionInfo), (x) -> new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
SessionMetaData oldValue = sessions.putIfAbsent(toSessionId(sessionInfo), newValue);
return oldValue != null ? oldValue : newValue;
} }
@Override @Override
@ -719,12 +725,8 @@ public class DefaultTransportService implements TransportService {
} }
sessions.remove(uuid); sessions.remove(uuid);
sessionsToRemove.add(uuid); sessionsToRemove.add(uuid);
process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto sessionMD.getListener().onRemoteSessionCloseCommand(uuid, SESSION_CLOSE_NOTIFICATION_PROTO);
.newBuilder()
.setMessage("Session has expired due to last activity time!")
.build();
sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto);
} }
} else { } else {
if (lastActivityTime > sessionAD.getLastReportedActivityTime()) { if (lastActivityTime > sessionAD.getLastReportedActivityTime()) {
@ -1018,7 +1020,7 @@ public class DefaultTransportService implements TransportService {
return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
} }
public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) { private static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
return TransportProtos.SessionEventMsg.newBuilder() return TransportProtos.SessionEventMsg.newBuilder()
.setSessionType(TransportProtos.SessionType.ASYNC) .setSessionType(TransportProtos.SessionType.ASYNC)
.setEvent(event).build(); .setEvent(event).build();