diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index d7f64e58ee..54df37c3f6 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -313,6 +313,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } break; + case SUBSCRIBE: + MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg; + processSubscribe(ctx, subscribeMessage); + break; + case UNSUBSCRIBE: + MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) msg; + processUnsubscribe(ctx, unsubscribeMessage); + break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); break; @@ -750,7 +758,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { - if (!checkConnected(ctx, mqttMsg)) { + if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) { ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF))); return; } @@ -760,6 +768,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); + if (deviceSessionCtx.isProvisionOnly()) { + if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { + registerSubQoS(topic, grantedQoSList, reqQoS); + } else { + log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); + } + activityReported = true; + continue; + } if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) { processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); activityReported = true; @@ -822,7 +840,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: @@ -873,7 +890,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { - if (!checkConnected(ctx, mqttMsg)) { + if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) { ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue()))); return; @@ -887,6 +904,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement mqttQoSMap.remove(matcher); try { short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue(); + if (deviceSessionCtx.isProvisionOnly()) { + if (!matcher.matches(MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC)) { + resultValue = MqttReasonCodes.UnsubAck.TOPIC_FILTER_INVALID.byteValue(); + } + unSubResults.add(resultValue); + activityReported = true; + continue; + } switch (topicName) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: @@ -917,7 +942,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index c298d4fea3..8c26be9b45 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; import io.netty.handler.codec.mqtt.MqttReasonCodes; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -491,6 +492,48 @@ public class MqttClientTest extends AbstractContainerTest { updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED); } + @Test + public void provisionRequestForCheckSubAckReceived() throws Exception { + + DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId()); + deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient("provision", listener, MqttVersion.MQTT_5); + final MqttReasonCodes.SubAck[] subAckResult = new MqttReasonCodes.SubAck[1]; + mqttClient.setCallback(new MqttClientCallback() { + @Override + public void connectionLost(Throwable cause) { + } + + @Override + public void onSuccessfulReconnect() { + } + + @Override + public void onSubAck(MqttSubAckMessage subAckMessage) { + subAckResult[0] = subAckMessage.payload().typedReasonCodes().get(0); + } + } + ); + + mqttClient.on("/provision/response", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(2 * timeoutMultiplier); + assertThat(subAckResult[0]).isNotNull(); + assertThat(MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(subAckResult[0])); + + subAckResult[0] = null; + mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(2 * timeoutMultiplier); + assertThat(subAckResult[0]).isNotNull(); + assertThat(MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID.equals(subAckResult[0])); + + testRestClient.deleteDeviceIfExists(device.getId()); + updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED); + } + + + @Test public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {