From bff16ddafd650f5f1117fd0a73131ee5e07bc133 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 22 Dec 2020 18:02:38 +0200 Subject: [PATCH 1/2] Mqtt flag sessionPresent depends on flag isCleanSession --- .../transport/mqtt/MqttTransportHandler.java | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index abb8f629f1..7f45577536 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -357,7 +357,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier()); X509Certificate cert; if (sslHandler != null && (cert = getX509Certificate()) != null) { - processX509CertConnect(ctx, cert); + processX509CertConnect(ctx, cert, msg); } else { processAuthTokenConnect(ctx, msg); } @@ -367,27 +367,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement String userName = msg.payload().userName(); log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName); if (StringUtils.isEmpty(userName)) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, msg)); ctx.close(); } else { transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(), new TransportServiceCallback() { @Override - public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { - onValidateDeviceResponse(msg, ctx); + public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) { + onValidateDeviceResponse(responseMsg, ctx, msg); } @Override public void onError(Throwable e) { log.trace("[{}] Failed to process credentials: {}", address, userName, e); - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); ctx.close(); } }); } } - private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { + private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage msg) { try { if(!context.isSkipValidityCheckForClientCert()){ cert.checkValidity(); @@ -397,19 +397,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), new TransportServiceCallback() { @Override - public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { - onValidateDeviceResponse(msg, ctx); + public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) { + onValidateDeviceResponse(responseMsg, ctx, msg); } @Override public void onError(Throwable e) { log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); ctx.close(); } }); } catch (Exception e) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg)); ctx.close(); } } @@ -433,11 +433,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement doDisconnect(); } - private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) { + private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode, MqttConnectMessage msg) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0); MqttConnAckVariableHeader mqttConnAckVariableHeader = - new MqttConnAckVariableHeader(returnCode, true); + new MqttConnAckVariableHeader(returnCode, !msg.variableHeader().isCleanSession()); return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); } @@ -513,36 +513,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) { - if (!msg.hasDeviceInfo()) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg responseMsg, ChannelHandlerContext ctx, MqttConnectMessage msg) { + if (!responseMsg.hasDeviceInfo()) { + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg)); ctx.close(); } else { - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); + deviceSessionCtx.setDeviceInfo(responseMsg.getDeviceInfo()); sessionInfo = SessionInfoProto.newBuilder() .setNodeId(context.getNodeId()) .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) - .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB()) - .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) - .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) - .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) - .setDeviceName(msg.getDeviceInfo().getDeviceName()) - .setDeviceType(msg.getDeviceInfo().getDeviceType()) + .setDeviceIdMSB(responseMsg.getDeviceInfo().getDeviceIdMSB()) + .setDeviceIdLSB(responseMsg.getDeviceInfo().getDeviceIdLSB()) + .setTenantIdMSB(responseMsg.getDeviceInfo().getTenantIdMSB()) + .setTenantIdLSB(responseMsg.getDeviceInfo().getTenantIdLSB()) + .setDeviceName(responseMsg.getDeviceInfo().getDeviceName()) + .setDeviceType(responseMsg.getDeviceInfo().getDeviceType()) .build(); transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback() { @Override - public void onSuccess(Void msg) { + public void onSuccess(Void response) { transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this); checkGatewaySession(); - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, msg)); log.info("[{}] Client connected!", sessionId); } @Override public void onError(Throwable e) { log.warn("[{}] Failed to submit session event", sessionId, e); - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); ctx.close(); } }); From 16d82e77fc5000a19d30797ef195ac30b2cf406a Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 23 Dec 2020 16:54:14 +0200 Subject: [PATCH 2/2] refactored --- .../transport/mqtt/MqttTransportHandler.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 7f45577536..bfd32eb751 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -363,31 +363,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { - String userName = msg.payload().userName(); + private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { + String userName = connectMessage.payload().userName(); log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName); if (StringUtils.isEmpty(userName)) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, msg)); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, connectMessage)); ctx.close(); } else { transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(), new TransportServiceCallback() { @Override - public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) { - onValidateDeviceResponse(responseMsg, ctx, msg); + public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { + onValidateDeviceResponse(msg, ctx, connectMessage); } @Override public void onError(Throwable e) { log.trace("[{}] Failed to process credentials: {}", address, userName, e); - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage)); ctx.close(); } }); } } - private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage msg) { + private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage connectMessage) { try { if(!context.isSkipValidityCheckForClientCert()){ cert.checkValidity(); @@ -397,19 +397,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), new TransportServiceCallback() { @Override - public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) { - onValidateDeviceResponse(responseMsg, ctx, msg); + public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { + onValidateDeviceResponse(msg, ctx, connectMessage); } @Override public void onError(Throwable e) { log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage)); ctx.close(); } }); } catch (Exception e) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg)); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage)); ctx.close(); } } @@ -513,36 +513,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg responseMsg, ChannelHandlerContext ctx, MqttConnectMessage msg) { - if (!responseMsg.hasDeviceInfo()) { - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg)); + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { + if (!msg.hasDeviceInfo()) { + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage)); ctx.close(); } else { - deviceSessionCtx.setDeviceInfo(responseMsg.getDeviceInfo()); + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); sessionInfo = SessionInfoProto.newBuilder() .setNodeId(context.getNodeId()) .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) - .setDeviceIdMSB(responseMsg.getDeviceInfo().getDeviceIdMSB()) - .setDeviceIdLSB(responseMsg.getDeviceInfo().getDeviceIdLSB()) - .setTenantIdMSB(responseMsg.getDeviceInfo().getTenantIdMSB()) - .setTenantIdLSB(responseMsg.getDeviceInfo().getTenantIdLSB()) - .setDeviceName(responseMsg.getDeviceInfo().getDeviceName()) - .setDeviceType(responseMsg.getDeviceInfo().getDeviceType()) + .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB()) + .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) + .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) + .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) + .setDeviceName(msg.getDeviceInfo().getDeviceName()) + .setDeviceType(msg.getDeviceInfo().getDeviceType()) .build(); transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback() { @Override public void onSuccess(Void response) { transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this); checkGatewaySession(); - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, msg)); + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); log.info("[{}] Client connected!", sessionId); } @Override public void onError(Throwable e) { log.warn("[{}] Failed to submit session event", sessionId, e); - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage)); ctx.close(); } });