TbMqttNode: add protocolVersion config

This commit is contained in:
Andrii Landiak 2025-05-09 10:55:31 +03:00
parent a445364ce3
commit e237be0fb2
3 changed files with 39 additions and 3 deletions

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
@ -126,6 +127,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
config.setClientId(getClientId(ctx));
}
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
config.setProtocolVersion(this.mqttNodeConfiguration.getProtocolVersion());
MqttClientSettings mqttClientSettings = ctx.getMqttClientSettings();
config.setRetransmissionConfig(new MqttClientConfig.RetransmissionConfig(
@ -202,9 +204,17 @@ public class TbMqttNode extends TbAbstractExternalNode {
((ObjectNode) oldConfiguration).put(parseToPlainText, false);
}
break;
case 1:
String protocolVersion = "protocolVersion";
if (!oldConfiguration.has(protocolVersion)) {
hasChanges = true;
((ObjectNode) oldConfiguration).put(protocolVersion, MqttVersion.MQTT_3_1_1.name());
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.mqtt;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.rule.engine.credentials.AnonymousCredentials;
@ -30,10 +31,10 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
private String clientId;
private boolean appendClientIdSuffix;
private boolean retainedMessage;
private boolean cleanSession;
private boolean ssl;
private boolean parseToPlainText;
private MqttVersion protocolVersion;
private ClientCredentials credentials;
@Override
@ -46,6 +47,7 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
configuration.setSsl(false);
configuration.setRetainedMessage(false);
configuration.setParseToPlainText(false);
configuration.setProtocolVersion(MqttVersion.MQTT_3_1_1);
configuration.setCredentials(new AnonymousCredentials());
return configuration;
}

View File

@ -20,6 +20,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.Future;
@ -138,6 +139,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
assertThat(mqttNodeConfig.isCleanSession()).isTrue();
assertThat(mqttNodeConfig.isSsl()).isFalse();
assertThat(mqttNodeConfig.isParseToPlainText()).isFalse();
assertThat(mqttNodeConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1);
assertThat(mqttNodeConfig.getCredentials()).isInstanceOf(AnonymousCredentials.class);
}
@ -381,6 +383,24 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
then(mqttClientMock).shouldHaveNoInteractions();
}
@ParameterizedTest
@MethodSource
public void verifyProtocolVersionMapping(MqttVersion expectedVersion) throws Exception {
mqttNodeConfig.setProtocolVersion(expectedVersion);
given(ctxMock.isExternalNodeForceAck()).willReturn(false);
mockSuccessfulInit();
mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)));
ArgumentCaptor<MqttClientConfig> configCaptor = ArgumentCaptor.forClass(MqttClientConfig.class);
then(mqttNode).should().prepareMqttClientConfig(configCaptor.capture());
assertThat(expectedVersion).isEqualTo(configCaptor.getValue().getProtocolVersion());
}
private static Stream<Arguments> verifyProtocolVersionMapping() {
return Stream.of(MqttVersion.values()).map(Arguments::of);
}
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// default config for version 0
@ -391,10 +411,14 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
// default config for version 1 with upgrade from version 0
Arguments.of(1,
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}",
true,
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1_1\"}"),
// default config for version 2 with upgrade from version 1
Arguments.of(2,
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1_1\"}",
false,
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}")
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1_1\"}")
);
}
@Override