Merge pull request #12069 from imbeacon/fix/provision-topic-suback

Fixed non returning SubAck for provisioning clients
This commit is contained in:
Viacheslav Klimov 2024-11-28 14:38:35 +02:00 committed by GitHub
commit 7ae43fe8ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 71 additions and 4 deletions

View File

@ -313,6 +313,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId);
} }
break; break;
case SUBSCRIBE:
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg;
processSubscribe(ctx, subscribeMessage);
break;
case UNSUBSCRIBE:
MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) msg;
processUnsubscribe(ctx, unsubscribeMessage);
break;
case PINGREQ: case PINGREQ:
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
break; break;
@ -750,7 +758,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) { if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) {
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF))); ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF)));
return; return;
} }
@ -760,6 +768,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName(); String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService(); MqttQoS reqQoS = subscription.qualityOfService();
if (deviceSessionCtx.isProvisionOnly()) {
if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) {
registerSubQoS(topic, grantedQoSList, reqQoS);
} else {
log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
}
activityReported = true;
continue;
}
if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) { if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true; activityReported = true;
@ -822,7 +840,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
@ -873,7 +890,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) { if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) {
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(),
Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue()))); Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue())));
return; return;
@ -887,6 +904,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
mqttQoSMap.remove(matcher); mqttQoSMap.remove(matcher);
try { try {
short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue(); short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue();
if (deviceSessionCtx.isProvisionOnly()) {
if (!matcher.matches(MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC)) {
resultValue = MqttReasonCodes.UnsubAck.TOPIC_FILTER_INVALID.byteValue();
}
unSubResults.add(resultValue);
activityReported = true;
continue;
}
switch (topicName) { switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
@ -917,7 +942,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:

View File

@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttReasonCodes; import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -491,6 +492,48 @@ public class MqttClientTest extends AbstractContainerTest {
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED); updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
} }
@Test
public void provisionRequestForCheckSubAckReceived() throws Exception {
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient("provision", listener, MqttVersion.MQTT_5);
final MqttReasonCodes.SubAck[] subAckResult = new MqttReasonCodes.SubAck[1];
mqttClient.setCallback(new MqttClientCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void onSuccessfulReconnect() {
}
@Override
public void onSubAck(MqttSubAckMessage subAckMessage) {
subAckResult[0] = subAckMessage.payload().typedReasonCodes().get(0);
}
}
);
mqttClient.on("/provision/response", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(2 * timeoutMultiplier);
assertThat(subAckResult[0]).isNotNull();
assertThat(MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(subAckResult[0]));
subAckResult[0] = null;
mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(2 * timeoutMultiplier);
assertThat(subAckResult[0]).isNotNull();
assertThat(MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID.equals(subAckResult[0]));
testRestClient.deleteDeviceIfExists(device.getId());
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
}
@Test @Test
public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception { public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {