From e237be0fb2d4fda4b75c9738b87e20ed2c98e3c4 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 9 May 2025 10:55:31 +0300 Subject: [PATCH] TbMqttNode: add protocolVersion config --- .../rule/engine/mqtt/TbMqttNode.java | 10 +++++++ .../engine/mqtt/TbMqttNodeConfiguration.java | 4 ++- .../rule/engine/mqtt/TbMqttNodeTest.java | 28 +++++++++++++++++-- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 28a9e1ff4b..6de6e1ae5d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslContext; import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; @@ -126,6 +127,7 @@ public class TbMqttNode extends TbAbstractExternalNode { config.setClientId(getClientId(ctx)); } config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); + config.setProtocolVersion(this.mqttNodeConfiguration.getProtocolVersion()); MqttClientSettings mqttClientSettings = ctx.getMqttClientSettings(); config.setRetransmissionConfig(new MqttClientConfig.RetransmissionConfig( @@ -202,9 +204,17 @@ public class TbMqttNode extends TbAbstractExternalNode { ((ObjectNode) oldConfiguration).put(parseToPlainText, false); } break; + case 1: + String protocolVersion = "protocolVersion"; + if (!oldConfiguration.has(protocolVersion)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put(protocolVersion, MqttVersion.MQTT_3_1_1.name()); + } + break; default: break; } return new TbPair<>(hasChanges, oldConfiguration); } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java index faa465193a..8bbe920277 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.mqtt; +import io.netty.handler.codec.mqtt.MqttVersion; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; import org.thingsboard.rule.engine.credentials.AnonymousCredentials; @@ -30,10 +31,10 @@ public class TbMqttNodeConfiguration implements NodeConfiguration configCaptor = ArgumentCaptor.forClass(MqttClientConfig.class); + then(mqttNode).should().prepareMqttClientConfig(configCaptor.capture()); + assertThat(expectedVersion).isEqualTo(configCaptor.getValue().getProtocolVersion()); + } + + private static Stream verifyProtocolVersionMapping() { + return Stream.of(MqttVersion.values()).map(Arguments::of); + } + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { return Stream.of( // default config for version 0 @@ -391,10 +411,14 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { // default config for version 1 with upgrade from version 0 Arguments.of(1, "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}", + true, + "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1_1\"}"), + // default config for version 2 with upgrade from version 1 + Arguments.of(2, + "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1_1\"}", false, - "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}") + "{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1_1\"}") ); - } @Override