From 608e646ca8e30b4f1ad45e40046a277a2ac622b3 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 19 Nov 2024 09:43:14 +0200 Subject: [PATCH 1/4] Fixed non returning SubAck for provisioning clients --- .../transport/mqtt/MqttTransportHandler.java | 23 +++++++++- .../msa/connectivity/MqttClientTest.java | 43 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index d7f64e58ee..6d5c0f5d50 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -313,6 +313,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } break; + case SUBSCRIBE: + MqttSubscribeMessage mqttSubscribeMsg = (MqttSubscribeMessage) msg; + log.trace("[{}] Processing subscription [{}]!", sessionId, mqttSubscribeMsg.variableHeader().messageId()); + List grantedQoSList = new ArrayList<>(); + for (MqttTopicSubscription subscription : mqttSubscribeMsg.payload().topicSubscriptions()) { + String topic = subscription.topicFilter(); + MqttQoS reqQoS = subscription.qualityOfService(); + try { + if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { + registerSubQoS(topic, grantedQoSList, reqQoS); + } else { + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); + break; + } + } catch (Exception e) { + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR)); + } + } + ctx.writeAndFlush(createSubAckMessage(mqttSubscribeMsg.variableHeader().messageId(), grantedQoSList)); + break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); break; @@ -822,7 +844,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index c298d4fea3..8c26be9b45 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; import io.netty.handler.codec.mqtt.MqttReasonCodes; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -491,6 +492,48 @@ public class MqttClientTest extends AbstractContainerTest { updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED); } + @Test + public void provisionRequestForCheckSubAckReceived() throws Exception { + + DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId()); + deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient("provision", listener, MqttVersion.MQTT_5); + final MqttReasonCodes.SubAck[] subAckResult = new MqttReasonCodes.SubAck[1]; + mqttClient.setCallback(new MqttClientCallback() { + @Override + public void connectionLost(Throwable cause) { + } + + @Override + public void onSuccessfulReconnect() { + } + + @Override + public void onSubAck(MqttSubAckMessage subAckMessage) { + subAckResult[0] = subAckMessage.payload().typedReasonCodes().get(0); + } + } + ); + + mqttClient.on("/provision/response", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(2 * timeoutMultiplier); + assertThat(subAckResult[0]).isNotNull(); + assertThat(MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(subAckResult[0])); + + subAckResult[0] = null; + mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(2 * timeoutMultiplier); + assertThat(subAckResult[0]).isNotNull(); + assertThat(MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID.equals(subAckResult[0])); + + testRestClient.deleteDeviceIfExists(device.getId()); + updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED); + } + + + @Test public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception { From 936ad3c2b252e3f94edcc7dc371862ba27fb6de1 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 25 Nov 2024 12:19:47 +0200 Subject: [PATCH 2/4] Refactored due to comments --- .../transport/mqtt/MqttTransportHandler.java | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 6d5c0f5d50..e2bc8cd768 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -314,26 +314,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } break; case SUBSCRIBE: - MqttSubscribeMessage mqttSubscribeMsg = (MqttSubscribeMessage) msg; - log.trace("[{}] Processing subscription [{}]!", sessionId, mqttSubscribeMsg.variableHeader().messageId()); - List grantedQoSList = new ArrayList<>(); - for (MqttTopicSubscription subscription : mqttSubscribeMsg.payload().topicSubscriptions()) { - String topic = subscription.topicFilter(); - MqttQoS reqQoS = subscription.qualityOfService(); - try { - if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { - registerSubQoS(topic, grantedQoSList, reqQoS); - } else { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); - grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); - break; - } - } catch (Exception e) { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); - grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR)); - } - } - ctx.writeAndFlush(createSubAckMessage(mqttSubscribeMsg.variableHeader().messageId(), grantedQoSList)); + MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg; + processSubscribe(ctx, subscribeMessage); break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); @@ -772,7 +754,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { - if (!checkConnected(ctx, mqttMsg)) { + if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) { ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF))); return; } @@ -782,6 +764,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); + if (deviceSessionCtx.isProvisionOnly()) { + if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { + registerSubQoS(topic, grantedQoSList, reqQoS); + } else { + log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); + } + activityReported = true; + continue; + } if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) { processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); activityReported = true; From 44930bf9a1e777e088370cc38903249257a54f23 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 25 Nov 2024 13:39:13 +0200 Subject: [PATCH 3/4] Added processing for unsubscribe for provisioning only clients --- .../server/transport/mqtt/MqttTransportHandler.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e2bc8cd768..ed721954a7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -886,7 +886,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { - if (!checkConnected(ctx, mqttMsg)) { + if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) { ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue()))); return; @@ -900,6 +900,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement mqttQoSMap.remove(matcher); try { short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue(); + if (deviceSessionCtx.isProvisionOnly()) { + if (!matcher.matches(MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC)) { + resultValue = MqttReasonCodes.UnsubAck.TOPIC_FILTER_INVALID.byteValue(); + } + unSubResults.add(resultValue); + activityReported = true; + continue; + } switch (topicName) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: @@ -930,7 +938,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: From 52b2c5f9ebe5d3eeb9292dd736c8348f73ec32b4 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 25 Nov 2024 13:42:57 +0200 Subject: [PATCH 4/4] Added forgotten case --- .../server/transport/mqtt/MqttTransportHandler.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index ed721954a7..54df37c3f6 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -317,6 +317,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg; processSubscribe(ctx, subscribeMessage); break; + case UNSUBSCRIBE: + MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) msg; + processUnsubscribe(ctx, unsubscribeMessage); + break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); break;