diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index d7f64e58ee..6d5c0f5d50 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -313,6 +313,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } break; + case SUBSCRIBE: + MqttSubscribeMessage mqttSubscribeMsg = (MqttSubscribeMessage) msg; + log.trace("[{}] Processing subscription [{}]!", sessionId, mqttSubscribeMsg.variableHeader().messageId()); + List grantedQoSList = new ArrayList<>(); + for (MqttTopicSubscription subscription : mqttSubscribeMsg.payload().topicSubscriptions()) { + String topic = subscription.topicFilter(); + MqttQoS reqQoS = subscription.qualityOfService(); + try { + if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { + registerSubQoS(topic, grantedQoSList, reqQoS); + } else { + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); + break; + } + } catch (Exception e) { + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR)); + } + } + ctx.writeAndFlush(createSubAckMessage(mqttSubscribeMsg.variableHeader().messageId(), grantedQoSList)); + break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); break; @@ -822,7 +844,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index c298d4fea3..8c26be9b45 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; import io.netty.handler.codec.mqtt.MqttReasonCodes; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -491,6 +492,48 @@ public class MqttClientTest extends AbstractContainerTest { updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED); } + @Test + public void provisionRequestForCheckSubAckReceived() throws Exception { + + DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId()); + deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient("provision", listener, MqttVersion.MQTT_5); + final MqttReasonCodes.SubAck[] subAckResult = new MqttReasonCodes.SubAck[1]; + mqttClient.setCallback(new MqttClientCallback() { + @Override + public void connectionLost(Throwable cause) { + } + + @Override + public void onSuccessfulReconnect() { + } + + @Override + public void onSubAck(MqttSubAckMessage subAckMessage) { + subAckResult[0] = subAckMessage.payload().typedReasonCodes().get(0); + } + } + ); + + mqttClient.on("/provision/response", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(2 * timeoutMultiplier); + assertThat(subAckResult[0]).isNotNull(); + assertThat(MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(subAckResult[0])); + + subAckResult[0] = null; + mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(2 * timeoutMultiplier); + assertThat(subAckResult[0]).isNotNull(); + assertThat(MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID.equals(subAckResult[0])); + + testRestClient.deleteDeviceIfExists(device.getId()); + updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED); + } + + + @Test public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {