Fixed non returning SubAck for provisioning clients

This commit is contained in:
imbeacon 2024-11-19 09:43:14 +02:00
parent 192f903e6c
commit 608e646ca8
2 changed files with 65 additions and 1 deletions

View File

@ -313,6 +313,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId);
}
break;
case SUBSCRIBE:
MqttSubscribeMessage mqttSubscribeMsg = (MqttSubscribeMessage) msg;
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttSubscribeMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
for (MqttTopicSubscription subscription : mqttSubscribeMsg.payload().topicSubscriptions()) {
String topic = subscription.topicFilter();
MqttQoS reqQoS = subscription.qualityOfService();
try {
if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) {
registerSubQoS(topic, grantedQoSList, reqQoS);
} else {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
break;
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR));
}
}
ctx.writeAndFlush(createSubAckMessage(mqttSubscribeMsg.variableHeader().messageId(), grantedQoSList));
break;
case PINGREQ:
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
break;
@ -822,7 +844,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:

View File

@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -491,6 +492,48 @@ public class MqttClientTest extends AbstractContainerTest {
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
}
@Test
public void provisionRequestForCheckSubAckReceived() throws Exception {
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient("provision", listener, MqttVersion.MQTT_5);
final MqttReasonCodes.SubAck[] subAckResult = new MqttReasonCodes.SubAck[1];
mqttClient.setCallback(new MqttClientCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void onSuccessfulReconnect() {
}
@Override
public void onSubAck(MqttSubAckMessage subAckMessage) {
subAckResult[0] = subAckMessage.payload().typedReasonCodes().get(0);
}
}
);
mqttClient.on("/provision/response", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(2 * timeoutMultiplier);
assertThat(subAckResult[0]).isNotNull();
assertThat(MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(subAckResult[0]));
subAckResult[0] = null;
mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(2 * timeoutMultiplier);
assertThat(subAckResult[0]).isNotNull();
assertThat(MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID.equals(subAckResult[0]));
testRestClient.deleteDeviceIfExists(device.getId());
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
}
@Test
public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {