diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index c9c1f9f3f0..2cd26e667d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -55,7 +55,7 @@ import java.util.concurrent.TimeoutException; type = ComponentType.EXTERNAL, name = "mqtt", configClazz = TbMqttNodeConfiguration.class, - version = 1, + version = 2, clusteringMode = ComponentClusteringMode.USER_PREFERENCE, nodeDescription = "Publish messages to the MQTT broker", nodeDetails = "Will publish message payload to the MQTT broker with QoS AT_LEAST_ONCE.", diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index b103cf77fc..2ea56ce799 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.mqtt.azure; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.AzureIotHubUtil; @@ -32,18 +34,21 @@ import org.thingsboard.rule.engine.mqtt.TbMqttNode; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; @Slf4j @RuleNode( type = ComponentType.EXTERNAL, name = "azure iot hub", configClazz = TbAzureIotHubNodeConfiguration.class, + version = 1, clusteringMode = ComponentClusteringMode.SINGLETON, nodeDescription = "Publish messages to the Azure IoT Hub", nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS AT_LEAST_ONCE.", configDirective = "tbExternalNodeAzureIotHubConfig" ) public class TbAzureIotHubNode extends TbMqttNode { + @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { super.init(ctx); @@ -65,7 +70,6 @@ public class TbAzureIotHubNode extends TbMqttNode { } protected void prepareMqttClientConfig(MqttClientConfig config) { - config.setProtocolVersion(MqttVersion.MQTT_3_1_1); config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId())); ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); if (CredentialsType.SAS == credentials.getType()) { @@ -76,4 +80,22 @@ public class TbAzureIotHubNode extends TbMqttNode { MqttClient initAzureClient(TbContext ctx) throws Exception { return initClient(ctx); } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + String protocolVersion = "protocolVersion"; + if (!oldConfiguration.has(protocolVersion)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put(protocolVersion, MqttVersion.MQTT_3_1_1.name()); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeConfiguration.java index 2c48c193f0..64d839dc52 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeConfiguration.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.mqtt.azure; +import io.netty.handler.codec.mqtt.MqttVersion; import lombok.Data; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; @@ -30,6 +31,7 @@ public class TbAzureIotHubNodeConfiguration extends TbMqttNodeConfiguration { configuration.setConnectTimeoutSec(10); configuration.setCleanSession(true); configuration.setSsl(true); + configuration.setProtocolVersion(MqttVersion.MQTT_3_1_1); configuration.setCredentials(new AzureIotHubSasCredentials()); return configuration; } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java index 6554c7da01..60cb339bcc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java @@ -49,4 +49,5 @@ public abstract class AbstractRuleNodeUpgradeTest { ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); assertThat(upgradedConfig).isEqualTo(expectedConfig); } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java index 441d75b2d4..433d5d4673 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.mqtt.MqttVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.provider.Arguments; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; @@ -26,11 +27,15 @@ import org.thingsboard.common.util.AzureIotHubUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; +import java.util.stream.Stream; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; @@ -38,7 +43,7 @@ import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.willReturn; @ExtendWith(MockitoExtension.class) -public class TbAzureIotHubNodeTest { +public class TbAzureIotHubNodeTest extends AbstractRuleNodeUpgradeTest { private TbAzureIotHubNode azureIotHubNode; private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig; @@ -66,6 +71,7 @@ public class TbAzureIotHubNodeTest { assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue(); assertThat(azureIotHubNodeConfig.isSsl()).isTrue(); assertThat(azureIotHubNodeConfig.isParseToPlainText()).isFalse(); + assertThat(azureIotHubNodeConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1); assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class); } @@ -82,7 +88,6 @@ public class TbAzureIotHubNodeTest { MqttClientConfig mqttClientConfig = new MqttClientConfig(); azureIotHubNode.prepareMqttClientConfig(mqttClientConfig); - assertThat(mqttClientConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1); assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(azureIotHubNodeConfig.getHost(), mqttClientConfig.getClientId())); assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(azureIotHubNodeConfig.getHost(), credentials.getSasKey())); } @@ -105,4 +110,24 @@ public class TbAzureIotHubNodeTest { assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); } + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"topicPattern\":\"devices//messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}}}", + true, + "{\"topicPattern\":\"devices//messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(1, + "{\"topicPattern\":\"devices//messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}", + false, + "{\"topicPattern\":\"devices//messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}") + ); + } + + @Override + protected TbNode getTestNode() { + return azureIotHubNode; + } + }