Merge pull request #13324 from deaflynx/mqtt-protocol-version
Add MQTT version selection for rule nodes
This commit is contained in:
commit
6db26f4663
@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
|
import io.netty.handler.codec.mqtt.MqttVersion;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.util.concurrent.Promise;
|
import io.netty.util.concurrent.Promise;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -54,7 +55,7 @@ import java.util.concurrent.TimeoutException;
|
|||||||
type = ComponentType.EXTERNAL,
|
type = ComponentType.EXTERNAL,
|
||||||
name = "mqtt",
|
name = "mqtt",
|
||||||
configClazz = TbMqttNodeConfiguration.class,
|
configClazz = TbMqttNodeConfiguration.class,
|
||||||
version = 1,
|
version = 2,
|
||||||
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
|
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
|
||||||
nodeDescription = "Publish messages to the MQTT broker",
|
nodeDescription = "Publish messages to the MQTT broker",
|
||||||
nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",
|
nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",
|
||||||
@ -126,6 +127,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
|
|||||||
config.setClientId(getClientId(ctx));
|
config.setClientId(getClientId(ctx));
|
||||||
}
|
}
|
||||||
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
|
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
|
||||||
|
config.setProtocolVersion(this.mqttNodeConfiguration.getProtocolVersion());
|
||||||
|
|
||||||
MqttClientSettings mqttClientSettings = ctx.getMqttClientSettings();
|
MqttClientSettings mqttClientSettings = ctx.getMqttClientSettings();
|
||||||
config.setRetransmissionConfig(new MqttClientConfig.RetransmissionConfig(
|
config.setRetransmissionConfig(new MqttClientConfig.RetransmissionConfig(
|
||||||
@ -201,10 +203,17 @@ public class TbMqttNode extends TbAbstractExternalNode {
|
|||||||
hasChanges = true;
|
hasChanges = true;
|
||||||
((ObjectNode) oldConfiguration).put(parseToPlainText, false);
|
((ObjectNode) oldConfiguration).put(parseToPlainText, false);
|
||||||
}
|
}
|
||||||
|
case 1:
|
||||||
|
String protocolVersion = "protocolVersion";
|
||||||
|
if (!oldConfiguration.has(protocolVersion)) {
|
||||||
|
hasChanges = true;
|
||||||
|
((ObjectNode) oldConfiguration).put(protocolVersion, MqttVersion.MQTT_3_1.name());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return new TbPair<>(hasChanges, oldConfiguration);
|
return new TbPair<>(hasChanges, oldConfiguration);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.mqtt;
|
package org.thingsboard.rule.engine.mqtt;
|
||||||
|
|
||||||
|
import io.netty.handler.codec.mqtt.MqttVersion;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.credentials.AnonymousCredentials;
|
import org.thingsboard.rule.engine.credentials.AnonymousCredentials;
|
||||||
@ -30,10 +31,10 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
|
|||||||
private String clientId;
|
private String clientId;
|
||||||
private boolean appendClientIdSuffix;
|
private boolean appendClientIdSuffix;
|
||||||
private boolean retainedMessage;
|
private boolean retainedMessage;
|
||||||
|
|
||||||
private boolean cleanSession;
|
private boolean cleanSession;
|
||||||
private boolean ssl;
|
private boolean ssl;
|
||||||
private boolean parseToPlainText;
|
private boolean parseToPlainText;
|
||||||
|
private MqttVersion protocolVersion;
|
||||||
private ClientCredentials credentials;
|
private ClientCredentials credentials;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -46,6 +47,7 @@ public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConf
|
|||||||
configuration.setSsl(false);
|
configuration.setSsl(false);
|
||||||
configuration.setRetainedMessage(false);
|
configuration.setRetainedMessage(false);
|
||||||
configuration.setParseToPlainText(false);
|
configuration.setParseToPlainText(false);
|
||||||
|
configuration.setProtocolVersion(MqttVersion.MQTT_3_1_1);
|
||||||
configuration.setCredentials(new AnonymousCredentials());
|
configuration.setCredentials(new AnonymousCredentials());
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.mqtt.azure;
|
package org.thingsboard.rule.engine.mqtt.azure;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import io.netty.handler.codec.mqtt.MqttVersion;
|
import io.netty.handler.codec.mqtt.MqttVersion;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.common.util.AzureIotHubUtil;
|
import org.thingsboard.common.util.AzureIotHubUtil;
|
||||||
@ -32,18 +34,21 @@ import org.thingsboard.rule.engine.mqtt.TbMqttNode;
|
|||||||
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
|
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
|
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||||
|
import org.thingsboard.server.common.data.util.TbPair;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RuleNode(
|
@RuleNode(
|
||||||
type = ComponentType.EXTERNAL,
|
type = ComponentType.EXTERNAL,
|
||||||
name = "azure iot hub",
|
name = "azure iot hub",
|
||||||
configClazz = TbAzureIotHubNodeConfiguration.class,
|
configClazz = TbAzureIotHubNodeConfiguration.class,
|
||||||
|
version = 1,
|
||||||
clusteringMode = ComponentClusteringMode.SINGLETON,
|
clusteringMode = ComponentClusteringMode.SINGLETON,
|
||||||
nodeDescription = "Publish messages to the Azure IoT Hub",
|
nodeDescription = "Publish messages to the Azure IoT Hub",
|
||||||
nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS <b>AT_LEAST_ONCE</b>.",
|
nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS <b>AT_LEAST_ONCE</b>.",
|
||||||
configDirective = "tbExternalNodeAzureIotHubConfig"
|
configDirective = "tbExternalNodeAzureIotHubConfig"
|
||||||
)
|
)
|
||||||
public class TbAzureIotHubNode extends TbMqttNode {
|
public class TbAzureIotHubNode extends TbMqttNode {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
super.init(ctx);
|
super.init(ctx);
|
||||||
@ -65,7 +70,6 @@ public class TbAzureIotHubNode extends TbMqttNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void prepareMqttClientConfig(MqttClientConfig config) {
|
protected void prepareMqttClientConfig(MqttClientConfig config) {
|
||||||
config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
|
|
||||||
config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
|
config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
|
||||||
ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
|
ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
|
||||||
if (CredentialsType.SAS == credentials.getType()) {
|
if (CredentialsType.SAS == credentials.getType()) {
|
||||||
@ -76,4 +80,22 @@ public class TbAzureIotHubNode extends TbMqttNode {
|
|||||||
MqttClient initAzureClient(TbContext ctx) throws Exception {
|
MqttClient initAzureClient(TbContext ctx) throws Exception {
|
||||||
return initClient(ctx);
|
return initClient(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
|
||||||
|
boolean hasChanges = false;
|
||||||
|
switch (fromVersion) {
|
||||||
|
case 0:
|
||||||
|
String protocolVersion = "protocolVersion";
|
||||||
|
if (!oldConfiguration.has(protocolVersion)) {
|
||||||
|
hasChanges = true;
|
||||||
|
((ObjectNode) oldConfiguration).put(protocolVersion, MqttVersion.MQTT_3_1_1.name());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return new TbPair<>(hasChanges, oldConfiguration);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.mqtt.azure;
|
package org.thingsboard.rule.engine.mqtt.azure;
|
||||||
|
|
||||||
|
import io.netty.handler.codec.mqtt.MqttVersion;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
|
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
|
||||||
|
|
||||||
@ -30,6 +31,7 @@ public class TbAzureIotHubNodeConfiguration extends TbMqttNodeConfiguration {
|
|||||||
configuration.setConnectTimeoutSec(10);
|
configuration.setConnectTimeoutSec(10);
|
||||||
configuration.setCleanSession(true);
|
configuration.setCleanSession(true);
|
||||||
configuration.setSsl(true);
|
configuration.setSsl(true);
|
||||||
|
configuration.setProtocolVersion(MqttVersion.MQTT_3_1_1);
|
||||||
configuration.setCredentials(new AzureIotHubSasCredentials());
|
configuration.setCredentials(new AzureIotHubSasCredentials());
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,4 +49,5 @@ public abstract class AbstractRuleNodeUpgradeTest {
|
|||||||
ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond();
|
ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond();
|
||||||
assertThat(upgradedConfig).isEqualTo(expectedConfig);
|
assertThat(upgradedConfig).isEqualTo(expectedConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import io.netty.buffer.Unpooled;
|
|||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
|
import io.netty.handler.codec.mqtt.MqttVersion;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.handler.ssl.SslContextBuilder;
|
import io.netty.handler.ssl.SslContextBuilder;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
@ -138,6 +139,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
assertThat(mqttNodeConfig.isCleanSession()).isTrue();
|
assertThat(mqttNodeConfig.isCleanSession()).isTrue();
|
||||||
assertThat(mqttNodeConfig.isSsl()).isFalse();
|
assertThat(mqttNodeConfig.isSsl()).isFalse();
|
||||||
assertThat(mqttNodeConfig.isParseToPlainText()).isFalse();
|
assertThat(mqttNodeConfig.isParseToPlainText()).isFalse();
|
||||||
|
assertThat(mqttNodeConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1);
|
||||||
assertThat(mqttNodeConfig.getCredentials()).isInstanceOf(AnonymousCredentials.class);
|
assertThat(mqttNodeConfig.getCredentials()).isInstanceOf(AnonymousCredentials.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -382,20 +384,42 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
then(mqttClientMock).shouldHaveNoInteractions();
|
then(mqttClientMock).shouldHaveNoInteractions();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource
|
||||||
|
public void verifyProtocolVersionMapping(MqttVersion expectedVersion) throws Exception {
|
||||||
|
mqttNodeConfig.setProtocolVersion(expectedVersion);
|
||||||
|
|
||||||
|
given(ctxMock.isExternalNodeForceAck()).willReturn(false);
|
||||||
|
mockSuccessfulInit();
|
||||||
|
mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)));
|
||||||
|
|
||||||
|
ArgumentCaptor<MqttClientConfig> configCaptor = ArgumentCaptor.forClass(MqttClientConfig.class);
|
||||||
|
then(mqttNode).should().prepareMqttClientConfig(configCaptor.capture());
|
||||||
|
assertThat(expectedVersion).isEqualTo(configCaptor.getValue().getProtocolVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> verifyProtocolVersionMapping() {
|
||||||
|
return Stream.of(MqttVersion.values()).map(Arguments::of);
|
||||||
|
}
|
||||||
|
|
||||||
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||||
return Stream.of(
|
return Stream.of(
|
||||||
// default config for version 0
|
// default config for version 0
|
||||||
Arguments.of(0,
|
Arguments.of(0,
|
||||||
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"}}",
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"}}",
|
||||||
true,
|
true,
|
||||||
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}"),
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1\"}"),
|
||||||
// default config for version 1 with upgrade from version 0
|
// default config for version 1 with upgrade from version 0
|
||||||
Arguments.of(1,
|
Arguments.of(1,
|
||||||
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}",
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}",
|
||||||
|
true,
|
||||||
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1\"}"),
|
||||||
|
// default config for version 2 with upgrade from version 1
|
||||||
|
Arguments.of(2,
|
||||||
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1\"}",
|
||||||
false,
|
false,
|
||||||
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false}")
|
"{\"topicPattern\":\"my-topic\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"anonymous\"},\"parseToPlainText\":false, \"protocolVersion\":\"MQTT_3_1\"}")
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import io.netty.handler.codec.mqtt.MqttVersion;
|
|||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import org.springframework.test.util.ReflectionTestUtils;
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
@ -26,11 +27,15 @@ import org.thingsboard.common.util.AzureIotHubUtil;
|
|||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.mqtt.MqttClient;
|
import org.thingsboard.mqtt.MqttClient;
|
||||||
import org.thingsboard.mqtt.MqttClientConfig;
|
import org.thingsboard.mqtt.MqttClientConfig;
|
||||||
|
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.credentials.CertPemCredentials;
|
import org.thingsboard.rule.engine.credentials.CertPemCredentials;
|
||||||
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
|
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
|
||||||
|
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.assertj.core.api.Assertions.assertThatNoException;
|
import static org.assertj.core.api.Assertions.assertThatNoException;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
@ -38,7 +43,7 @@ import static org.mockito.BDDMockito.spy;
|
|||||||
import static org.mockito.BDDMockito.willReturn;
|
import static org.mockito.BDDMockito.willReturn;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class TbAzureIotHubNodeTest {
|
public class TbAzureIotHubNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||||
|
|
||||||
private TbAzureIotHubNode azureIotHubNode;
|
private TbAzureIotHubNode azureIotHubNode;
|
||||||
private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig;
|
private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig;
|
||||||
@ -66,6 +71,7 @@ public class TbAzureIotHubNodeTest {
|
|||||||
assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue();
|
assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue();
|
||||||
assertThat(azureIotHubNodeConfig.isSsl()).isTrue();
|
assertThat(azureIotHubNodeConfig.isSsl()).isTrue();
|
||||||
assertThat(azureIotHubNodeConfig.isParseToPlainText()).isFalse();
|
assertThat(azureIotHubNodeConfig.isParseToPlainText()).isFalse();
|
||||||
|
assertThat(azureIotHubNodeConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1);
|
||||||
assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class);
|
assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,7 +88,6 @@ public class TbAzureIotHubNodeTest {
|
|||||||
MqttClientConfig mqttClientConfig = new MqttClientConfig();
|
MqttClientConfig mqttClientConfig = new MqttClientConfig();
|
||||||
azureIotHubNode.prepareMqttClientConfig(mqttClientConfig);
|
azureIotHubNode.prepareMqttClientConfig(mqttClientConfig);
|
||||||
|
|
||||||
assertThat(mqttClientConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1);
|
|
||||||
assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(azureIotHubNodeConfig.getHost(), mqttClientConfig.getClientId()));
|
assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(azureIotHubNodeConfig.getHost(), mqttClientConfig.getClientId()));
|
||||||
assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(azureIotHubNodeConfig.getHost(), credentials.getSasKey()));
|
assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(azureIotHubNodeConfig.getHost(), credentials.getSasKey()));
|
||||||
}
|
}
|
||||||
@ -105,4 +110,24 @@ public class TbAzureIotHubNodeTest {
|
|||||||
assertThat(mqttNodeConfiguration.isCleanSession()).isTrue();
|
assertThat(mqttNodeConfiguration.isCleanSession()).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||||
|
return Stream.of(
|
||||||
|
// default config for version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}}}",
|
||||||
|
true,
|
||||||
|
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}"),
|
||||||
|
// default config for version 1 with upgrade from version 0
|
||||||
|
Arguments.of(1,
|
||||||
|
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}",
|
||||||
|
false,
|
||||||
|
"{\"topicPattern\":\"devices/<device_id>/messages/events/\",\"port\":1883,\"connectTimeoutSec\":10,\"cleanSession\":true, \"ssl\":false, \"retainedMessage\":false,\"credentials\":{\"type\":\"sas\",\"sasKey\":\"sasKey\",\"caCert\":null,\"caCertFileName\":null}, \"protocolVersion\":\"MQTT_3_1_1\"}\"}")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbNode getTestNode() {
|
||||||
|
return azureIotHubNode;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,6 +38,7 @@
|
|||||||
{{ 'rule-node-config.device-id-required' | translate }}
|
{{ 'rule-node-config.device-id-required' | translate }}
|
||||||
</mat-error>
|
</mat-error>
|
||||||
</mat-form-field>
|
</mat-form-field>
|
||||||
|
<tb-mqtt-version-select formControlName="protocolVersion" subscriptSizing="fixed"></tb-mqtt-version-select>
|
||||||
<mat-accordion>
|
<mat-accordion>
|
||||||
<mat-expansion-panel class="tb-mqtt-credentials-panel-group">
|
<mat-expansion-panel class="tb-mqtt-credentials-panel-group">
|
||||||
<mat-expansion-panel-header>
|
<mat-expansion-panel-header>
|
||||||
|
|||||||
@ -53,6 +53,7 @@ export class AzureIotHubConfigComponent extends RuleNodeConfigurationComponent {
|
|||||||
clientId: [configuration ? configuration.clientId : null, [Validators.required]],
|
clientId: [configuration ? configuration.clientId : null, [Validators.required]],
|
||||||
cleanSession: [configuration ? configuration.cleanSession : false, []],
|
cleanSession: [configuration ? configuration.cleanSession : false, []],
|
||||||
ssl: [configuration ? configuration.ssl : false, []],
|
ssl: [configuration ? configuration.ssl : false, []],
|
||||||
|
protocolVersion: [configuration ? configuration.protocolVersion : null, []],
|
||||||
credentials: this.fb.group(
|
credentials: this.fb.group(
|
||||||
{
|
{
|
||||||
type: [configuration && configuration.credentials ? configuration.credentials.type : null, [Validators.required]],
|
type: [configuration && configuration.credentials ? configuration.credentials.type : null, [Validators.required]],
|
||||||
|
|||||||
@ -72,6 +72,7 @@
|
|||||||
{{ 'rule-node-config.parse-to-plain-text' | translate }}
|
{{ 'rule-node-config.parse-to-plain-text' | translate }}
|
||||||
</mat-checkbox>
|
</mat-checkbox>
|
||||||
<div class="tb-hint">{{ "rule-node-config.parse-to-plain-text-hint" | translate }}</div>
|
<div class="tb-hint">{{ "rule-node-config.parse-to-plain-text-hint" | translate }}</div>
|
||||||
|
<tb-mqtt-version-select formControlName="protocolVersion"></tb-mqtt-version-select>
|
||||||
<mat-checkbox formControlName="cleanSession">
|
<mat-checkbox formControlName="cleanSession">
|
||||||
{{ 'rule-node-config.clean-session' | translate }}
|
{{ 'rule-node-config.clean-session' | translate }}
|
||||||
</mat-checkbox>
|
</mat-checkbox>
|
||||||
|
|||||||
@ -52,6 +52,7 @@ export class MqttConfigComponent extends RuleNodeConfigurationComponent {
|
|||||||
cleanSession: [configuration ? configuration.cleanSession : false, []],
|
cleanSession: [configuration ? configuration.cleanSession : false, []],
|
||||||
retainedMessage: [configuration ? configuration.retainedMessage : false, []],
|
retainedMessage: [configuration ? configuration.retainedMessage : false, []],
|
||||||
ssl: [configuration ? configuration.ssl : false, []],
|
ssl: [configuration ? configuration.ssl : false, []],
|
||||||
|
protocolVersion: [configuration ? configuration.protocolVersion : null, []],
|
||||||
credentials: [configuration ? configuration.credentials : null, []]
|
credentials: [configuration ? configuration.credentials : null, []]
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,30 @@
|
|||||||
|
<!--
|
||||||
|
|
||||||
|
Copyright © 2016-2025 The Thingsboard Authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
|
||||||
|
-->
|
||||||
|
<mat-form-field class="flex flex-1" [subscriptSizing]="subscriptSizing" [appearance]="appearance">
|
||||||
|
<mat-label translate>device-profile.mqtt-protocol-version</mat-label>
|
||||||
|
<mat-select [required]="required"
|
||||||
|
[disabled]="disabled"
|
||||||
|
[(ngModel)]="modelValue"
|
||||||
|
(ngModelChange)="mqttVersionChanged()">
|
||||||
|
@for (version of mqttVersions; track version) {
|
||||||
|
<mat-option [value]="version">
|
||||||
|
{{ mqttVersionTranslation.get(version) }}
|
||||||
|
</mat-option>
|
||||||
|
}
|
||||||
|
</mat-select>
|
||||||
|
</mat-form-field>
|
||||||
@ -0,0 +1,79 @@
|
|||||||
|
///
|
||||||
|
/// Copyright © 2016-2025 The Thingsboard Authors
|
||||||
|
///
|
||||||
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
/// you may not use this file except in compliance with the License.
|
||||||
|
/// You may obtain a copy of the License at
|
||||||
|
///
|
||||||
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
///
|
||||||
|
/// Unless required by applicable law or agreed to in writing, software
|
||||||
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
/// See the License for the specific language governing permissions and
|
||||||
|
/// limitations under the License.
|
||||||
|
///
|
||||||
|
|
||||||
|
import { Component, forwardRef, Input } from '@angular/core';
|
||||||
|
import { ControlValueAccessor, NG_VALUE_ACCESSOR } from '@angular/forms';
|
||||||
|
import { coerceBoolean } from '@shared/decorators/coercion';
|
||||||
|
import { SubscriptSizing, MatFormFieldAppearance } from '@angular/material/form-field';
|
||||||
|
import { MqttVersionTranslation, MqttVersion } from '@shared/models/mqtt.models';
|
||||||
|
|
||||||
|
@Component({
|
||||||
|
selector: 'tb-mqtt-version-select',
|
||||||
|
templateUrl: './mqtt-version-select.component.html',
|
||||||
|
styleUrls: [],
|
||||||
|
providers: [{
|
||||||
|
provide: NG_VALUE_ACCESSOR,
|
||||||
|
useExisting: forwardRef(() => MqttVersionSelectComponent),
|
||||||
|
multi: true
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
export class MqttVersionSelectComponent implements ControlValueAccessor {
|
||||||
|
|
||||||
|
@Input()
|
||||||
|
disabled: boolean;
|
||||||
|
|
||||||
|
@Input()
|
||||||
|
subscriptSizing: SubscriptSizing = 'dynamic';
|
||||||
|
|
||||||
|
@Input()
|
||||||
|
appearance: MatFormFieldAppearance = 'fill';
|
||||||
|
|
||||||
|
mqttVersions = Object.values(MqttVersion);
|
||||||
|
mqttVersionTranslation = MqttVersionTranslation;
|
||||||
|
modelValue: MqttVersion;
|
||||||
|
|
||||||
|
@Input()
|
||||||
|
@coerceBoolean()
|
||||||
|
required = false;
|
||||||
|
|
||||||
|
private propagateChange = (v: any) => { };
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
registerOnChange(fn: any): void {
|
||||||
|
this.propagateChange = fn;
|
||||||
|
}
|
||||||
|
|
||||||
|
registerOnTouched(fn: any): void {
|
||||||
|
}
|
||||||
|
|
||||||
|
setDisabledState(isDisabled: boolean): void {
|
||||||
|
this.disabled = isDisabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
writeValue(value: MqttVersion | null): void {
|
||||||
|
this.modelValue = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
mqttVersionChanged() {
|
||||||
|
this.updateView();
|
||||||
|
}
|
||||||
|
|
||||||
|
private updateView() {
|
||||||
|
this.propagateChange(this.modelValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
29
ui-ngx/src/app/shared/models/mqtt.models.ts
Normal file
29
ui-ngx/src/app/shared/models/mqtt.models.ts
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
///
|
||||||
|
/// Copyright © 2016-2025 The Thingsboard Authors
|
||||||
|
///
|
||||||
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
/// you may not use this file except in compliance with the License.
|
||||||
|
/// You may obtain a copy of the License at
|
||||||
|
///
|
||||||
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
///
|
||||||
|
/// Unless required by applicable law or agreed to in writing, software
|
||||||
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
/// See the License for the specific language governing permissions and
|
||||||
|
/// limitations under the License.
|
||||||
|
///
|
||||||
|
|
||||||
|
export enum MqttVersion {
|
||||||
|
MQTT_3_1 = 'MQTT_3_1',
|
||||||
|
MQTT_3_1_1 = 'MQTT_3_1_1',
|
||||||
|
MQTT_5 = 'MQTT_5'
|
||||||
|
}
|
||||||
|
|
||||||
|
export const DEFAULT_MQTT_VERSION = MqttVersion.MQTT_3_1_1;
|
||||||
|
|
||||||
|
export const MqttVersionTranslation = new Map<MqttVersion, string>([
|
||||||
|
[MqttVersion.MQTT_3_1, 'MQTT 3.1'],
|
||||||
|
[MqttVersion.MQTT_3_1_1, 'MQTT 3.1.1'],
|
||||||
|
[MqttVersion.MQTT_5, 'MQTT 5.0']
|
||||||
|
]);
|
||||||
@ -227,6 +227,7 @@ import { JsFuncModulesComponent } from '@shared/components/js-func-modules.compo
|
|||||||
import { JsFuncModuleRowComponent } from '@shared/components/js-func-module-row.component';
|
import { JsFuncModuleRowComponent } from '@shared/components/js-func-module-row.component';
|
||||||
import { EntityKeyAutocompleteComponent } from '@shared/components/entity/entity-key-autocomplete.component';
|
import { EntityKeyAutocompleteComponent } from '@shared/components/entity/entity-key-autocomplete.component';
|
||||||
import { DurationLeftPipe } from '@shared/pipe/duration-left.pipe';
|
import { DurationLeftPipe } from '@shared/pipe/duration-left.pipe';
|
||||||
|
import { MqttVersionSelectComponent } from '@shared/components/mqtt-version-select.component';
|
||||||
|
|
||||||
export function MarkedOptionsFactory(markedOptionsService: MarkedOptionsService) {
|
export function MarkedOptionsFactory(markedOptionsService: MarkedOptionsService) {
|
||||||
return markedOptionsService;
|
return markedOptionsService;
|
||||||
@ -439,6 +440,7 @@ export function MarkedOptionsFactory(markedOptionsService: MarkedOptionsService)
|
|||||||
HexInputComponent,
|
HexInputComponent,
|
||||||
ScadaSymbolInputComponent,
|
ScadaSymbolInputComponent,
|
||||||
EntityKeyAutocompleteComponent,
|
EntityKeyAutocompleteComponent,
|
||||||
|
MqttVersionSelectComponent,
|
||||||
],
|
],
|
||||||
imports: [
|
imports: [
|
||||||
CommonModule,
|
CommonModule,
|
||||||
@ -702,6 +704,7 @@ export function MarkedOptionsFactory(markedOptionsService: MarkedOptionsService)
|
|||||||
WidgetButtonComponent,
|
WidgetButtonComponent,
|
||||||
ScadaSymbolInputComponent,
|
ScadaSymbolInputComponent,
|
||||||
EntityKeyAutocompleteComponent,
|
EntityKeyAutocompleteComponent,
|
||||||
|
MqttVersionSelectComponent,
|
||||||
]
|
]
|
||||||
})
|
})
|
||||||
export class SharedModule { }
|
export class SharedModule { }
|
||||||
|
|||||||
@ -1919,6 +1919,7 @@
|
|||||||
"mqtt-use-json-format-for-default-downlink-topics-hint": "When enabled, the platform will use Json payload format to push attributes and RPC via the following topics: <b>v1/devices/me/attributes/response/$request_id</b>, <b>v1/devices/me/attributes</b>, <b>v1/devices/me/rpc/request/$request_id</b>, <b>v1/devices/me/rpc/response/$request_id</b>. This setting does not impact attribute and rpc subscriptions sent using new (v2) topics: <b>v2/a/res/$request_id</b>, <b>v2/a</b>, <b>v2/r/req/$request_id</b>, <b>v2/r/res/$request_id</b>. Where <b>$request_id</b> is an integer request identifier.",
|
"mqtt-use-json-format-for-default-downlink-topics-hint": "When enabled, the platform will use Json payload format to push attributes and RPC via the following topics: <b>v1/devices/me/attributes/response/$request_id</b>, <b>v1/devices/me/attributes</b>, <b>v1/devices/me/rpc/request/$request_id</b>, <b>v1/devices/me/rpc/response/$request_id</b>. This setting does not impact attribute and rpc subscriptions sent using new (v2) topics: <b>v2/a/res/$request_id</b>, <b>v2/a</b>, <b>v2/r/req/$request_id</b>, <b>v2/r/res/$request_id</b>. Where <b>$request_id</b> is an integer request identifier.",
|
||||||
"mqtt-send-ack-on-validation-exception": "Send PUBACK on PUBLISH message validation failure",
|
"mqtt-send-ack-on-validation-exception": "Send PUBACK on PUBLISH message validation failure",
|
||||||
"mqtt-send-ack-on-validation-exception-hint": "By default, the platform will close the MQTT session on message validation failure. When enabled, the platform will send publish acknowledgment instead of closing the session.",
|
"mqtt-send-ack-on-validation-exception-hint": "By default, the platform will close the MQTT session on message validation failure. When enabled, the platform will send publish acknowledgment instead of closing the session.",
|
||||||
|
"mqtt-protocol-version": "Protocol version",
|
||||||
"snmp-add-mapping": "Add SNMP mapping",
|
"snmp-add-mapping": "Add SNMP mapping",
|
||||||
"snmp-mapping-not-configured": "No mapping for OID to time series/telemetry configured",
|
"snmp-mapping-not-configured": "No mapping for OID to time series/telemetry configured",
|
||||||
"snmp-timseries-or-attribute-name": "Time series/attribute name for mapping",
|
"snmp-timseries-or-attribute-name": "Time series/attribute name for mapping",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user