Add config for AzureNode

This commit is contained in:
Andrii Landiak 2025-05-12 15:01:06 +03:00
parent 73044bb1f2
commit f8932da09a
5 changed files with 54 additions and 4 deletions

View File

@ -55,7 +55,7 @@ import java.util.concurrent.TimeoutException;
type = ComponentType.EXTERNAL, type = ComponentType.EXTERNAL,
name = "mqtt", name = "mqtt",
configClazz = TbMqttNodeConfiguration.class, configClazz = TbMqttNodeConfiguration.class,
version = 1, version = 2,
clusteringMode = ComponentClusteringMode.USER_PREFERENCE, clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
nodeDescription = "Publish messages to the MQTT broker", nodeDescription = "Publish messages to the MQTT broker",
nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.", nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.rule.engine.mqtt.azure; package org.thingsboard.rule.engine.mqtt.azure;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.AzureIotHubUtil; import org.thingsboard.common.util.AzureIotHubUtil;
@ -32,18 +34,21 @@ import org.thingsboard.rule.engine.mqtt.TbMqttNode;
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
@Slf4j @Slf4j
@RuleNode( @RuleNode(
type = ComponentType.EXTERNAL, type = ComponentType.EXTERNAL,
name = "azure iot hub", name = "azure iot hub",
configClazz = TbAzureIotHubNodeConfiguration.class, configClazz = TbAzureIotHubNodeConfiguration.class,
version = 1,
clusteringMode = ComponentClusteringMode.SINGLETON, clusteringMode = ComponentClusteringMode.SINGLETON,
nodeDescription = "Publish messages to the Azure IoT Hub", nodeDescription = "Publish messages to the Azure IoT Hub",
nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS <b>AT_LEAST_ONCE</b>.", nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS <b>AT_LEAST_ONCE</b>.",
configDirective = "tbExternalNodeAzureIotHubConfig" configDirective = "tbExternalNodeAzureIotHubConfig"
) )
public class TbAzureIotHubNode extends TbMqttNode { public class TbAzureIotHubNode extends TbMqttNode {
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx); super.init(ctx);
@ -65,7 +70,6 @@ public class TbAzureIotHubNode extends TbMqttNode {
} }
protected void prepareMqttClientConfig(MqttClientConfig config) { protected void prepareMqttClientConfig(MqttClientConfig config) {
config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId())); config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
if (CredentialsType.SAS == credentials.getType()) { if (CredentialsType.SAS == credentials.getType()) {
@ -76,4 +80,22 @@ public class TbAzureIotHubNode extends TbMqttNode {
MqttClient initAzureClient(TbContext ctx) throws Exception { MqttClient initAzureClient(TbContext ctx) throws Exception {
return initClient(ctx); return initClient(ctx);
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
String protocolVersion = "protocolVersion";
if (!oldConfiguration.has(protocolVersion)) {
hasChanges = true;
((ObjectNode) oldConfiguration).put(protocolVersion, MqttVersion.MQTT_3_1_1.name());
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.rule.engine.mqtt.azure; package org.thingsboard.rule.engine.mqtt.azure;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.Data; import lombok.Data;
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
@ -30,6 +31,7 @@ public class TbAzureIotHubNodeConfiguration extends TbMqttNodeConfiguration {
configuration.setConnectTimeoutSec(10); configuration.setConnectTimeoutSec(10);
configuration.setCleanSession(true); configuration.setCleanSession(true);
configuration.setSsl(true); configuration.setSsl(true);
configuration.setProtocolVersion(MqttVersion.MQTT_3_1_1);
configuration.setCredentials(new AzureIotHubSasCredentials()); configuration.setCredentials(new AzureIotHubSasCredentials());
return configuration; return configuration;
} }

View File

@ -49,4 +49,5 @@ public abstract class AbstractRuleNodeUpgradeTest {
ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond();
assertThat(upgradedConfig).isEqualTo(expectedConfig); assertThat(upgradedConfig).isEqualTo(expectedConfig);
} }
} }

View File

@ -19,6 +19,7 @@ import io.netty.handler.codec.mqtt.MqttVersion;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.provider.Arguments;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
@ -26,11 +27,15 @@ import org.thingsboard.common.util.AzureIotHubUtil;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.credentials.CertPemCredentials;
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -38,7 +43,7 @@ import static org.mockito.BDDMockito.spy;
import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willReturn;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class TbAzureIotHubNodeTest { public class TbAzureIotHubNodeTest extends AbstractRuleNodeUpgradeTest {
private TbAzureIotHubNode azureIotHubNode; private TbAzureIotHubNode azureIotHubNode;
private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig; private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig;
@ -66,6 +71,7 @@ public class TbAzureIotHubNodeTest {
assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue(); assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue();
assertThat(azureIotHubNodeConfig.isSsl()).isTrue(); assertThat(azureIotHubNodeConfig.isSsl()).isTrue();
assertThat(azureIotHubNodeConfig.isParseToPlainText()).isFalse(); assertThat(azureIotHubNodeConfig.isParseToPlainText()).isFalse();
assertThat(azureIotHubNodeConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1);
assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class); assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class);
} }
@ -82,7 +88,6 @@ public class TbAzureIotHubNodeTest {
MqttClientConfig mqttClientConfig = new MqttClientConfig(); MqttClientConfig mqttClientConfig = new MqttClientConfig();
azureIotHubNode.prepareMqttClientConfig(mqttClientConfig); azureIotHubNode.prepareMqttClientConfig(mqttClientConfig);
assertThat(mqttClientConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1);
assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(azureIotHubNodeConfig.getHost(), mqttClientConfig.getClientId())); assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(azureIotHubNodeConfig.getHost(), mqttClientConfig.getClientId()));
assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(azureIotHubNodeConfig.getHost(), credentials.getSasKey())); assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(azureIotHubNodeConfig.getHost(), credentials.getSasKey()));
} }
@ -105,4 +110,24 @@ public class TbAzureIotHubNodeTest {
assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); assertThat(mqttNodeConfiguration.isCleanSession()).isTrue();
} }
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// default config for version 0
Arguments.of(0,
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}}}",
true,
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}"),
// default config for version 1 with upgrade from version 0
Arguments.of(1,
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}",
false,
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}")
);
}
@Override
protected TbNode getTestNode() {
return azureIotHubNode;
}
} }