Refactored due to comments

This commit is contained in:
imbeacon 2024-11-25 12:19:47 +02:00
parent ea0aada55d
commit 936ad3c2b2

View File

@ -314,26 +314,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
break;
case SUBSCRIBE:
MqttSubscribeMessage mqttSubscribeMsg = (MqttSubscribeMessage) msg;
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttSubscribeMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
for (MqttTopicSubscription subscription : mqttSubscribeMsg.payload().topicSubscriptions()) {
String topic = subscription.topicFilter();
MqttQoS reqQoS = subscription.qualityOfService();
try {
if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) {
registerSubQoS(topic, grantedQoSList, reqQoS);
} else {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
break;
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR));
}
}
ctx.writeAndFlush(createSubAckMessage(mqttSubscribeMsg.variableHeader().messageId(), grantedQoSList));
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg;
processSubscribe(ctx, subscribeMessage);
break;
case PINGREQ:
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
@ -772,7 +754,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) {
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF)));
return;
}
@ -782,6 +764,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
if (deviceSessionCtx.isProvisionOnly()) {
if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) {
registerSubQoS(topic, grantedQoSList, reqQoS);
} else {
log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
}
activityReported = true;
continue;
}
if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;