From 936ad3c2b252e3f94edcc7dc371862ba27fb6de1 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 25 Nov 2024 12:19:47 +0200 Subject: [PATCH] Refactored due to comments --- .../transport/mqtt/MqttTransportHandler.java | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 6d5c0f5d50..e2bc8cd768 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -314,26 +314,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } break; case SUBSCRIBE: - MqttSubscribeMessage mqttSubscribeMsg = (MqttSubscribeMessage) msg; - log.trace("[{}] Processing subscription [{}]!", sessionId, mqttSubscribeMsg.variableHeader().messageId()); - List grantedQoSList = new ArrayList<>(); - for (MqttTopicSubscription subscription : mqttSubscribeMsg.payload().topicSubscriptions()) { - String topic = subscription.topicFilter(); - MqttQoS reqQoS = subscription.qualityOfService(); - try { - if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { - registerSubQoS(topic, grantedQoSList, reqQoS); - } else { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); - grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); - break; - } - } catch (Exception e) { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); - grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR)); - } - } - ctx.writeAndFlush(createSubAckMessage(mqttSubscribeMsg.variableHeader().messageId(), grantedQoSList)); + MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg; + processSubscribe(ctx, subscribeMessage); break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); @@ -772,7 +754,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { - if (!checkConnected(ctx, mqttMsg)) { + if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) { ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF))); return; } @@ -782,6 +764,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); + if (deviceSessionCtx.isProvisionOnly()) { + if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { + registerSubQoS(topic, grantedQoSList, reqQoS); + } else { + log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); + } + activityReported = true; + continue; + } if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) { processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); activityReported = true;