diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
index 5074024be5..ab6fcda8dc 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeoutException;
type = ComponentType.EXTERNAL,
name = "mqtt",
configClazz = TbMqttNodeConfiguration.class,
+ version = 1,
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
nodeDescription = "Publish messages to the MQTT broker",
nodeDetails = "Will publish message payload to the MQTT broker with QoS AT_LEAST_ONCE.",
@@ -145,7 +146,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
return client;
}
- protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException {
+ protected void prepareMqttClientConfig(MqttClientConfig config) {
ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials();
if (credentials.getType() == CredentialsType.BASIC) {
BasicCredentials basicCredentials = (BasicCredentials) credentials;
@@ -154,7 +155,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
}
}
- private SslContext getSslContext() throws SSLException {
+ protected SslContext getSslContext() throws SSLException {
return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java
index b78a441134..2cbc172570 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java
@@ -66,7 +66,7 @@ public class TbAzureIotHubNode extends TbMqttNode {
}
}
- protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException {
+ protected void prepareMqttClientConfig(MqttClientConfig config) {
config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java
index b46ecdf6fe..045ff3637f 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java
@@ -15,26 +15,222 @@
*/
package org.thingsboard.rule.engine.mqtt;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.test.util.ReflectionTestUtils;
+import org.thingsboard.common.util.JacksonUtil;
+import org.thingsboard.mqtt.MqttClient;
+import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
+import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.rule.engine.credentials.AnonymousCredentials;
+import org.thingsboard.rule.engine.credentials.BasicCredentials;
+import org.thingsboard.rule.engine.credentials.ClientCredentials;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.msg.TbMsgType;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import javax.net.ssl.SSLException;
+import java.util.Map;
+import java.util.UUID;
import java.util.stream.Stream;
-import static org.mockito.Mockito.mock;
+import static com.amazonaws.util.StringUtils.UTF8;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.mock;
+import static org.mockito.BDDMockito.spy;
+import static org.mockito.BDDMockito.then;
+import static org.mockito.BDDMockito.willAnswer;
+import static org.mockito.BDDMockito.willReturn;
+import static org.mockito.BDDMockito.willThrow;
@ExtendWith(MockitoExtension.class)
class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
+
+ private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287"));
+
@Spy
- TbMqttNode node;
+ private TbMqttNode node;
+ private TbMqttNodeConfiguration config;
+
+ @Mock
+ private TbContext ctxMock;
+ @Mock
+ private MqttClient clientMock;
@BeforeEach
public void setUp() throws Exception {
- node = mock(TbMqttNode.class);
+ node = spy(new TbMqttNode());
+ config = new TbMqttNodeConfiguration().defaultConfiguration();
+ }
+
+ @Test
+ public void verifyDefaultConfig() {
+ assertThat(config.getTopicPattern()).isEqualTo("my-topic");
+ assertThat(config.getHost()).isNull();
+ assertThat(config.getPort()).isEqualTo(1883);
+ assertThat(config.getConnectTimeoutSec()).isEqualTo(10);
+ assertThat(config.getClientId()).isNull();
+ assertThat(config.isAppendClientIdSuffix()).isFalse();
+ assertThat(config.isRetainedMessage()).isFalse();
+ assertThat(config.isCleanSession()).isTrue();
+ assertThat(config.isSsl()).isFalse();
+ assertThat(config.isParseToPlainText()).isFalse();
+ assertThat(config.getCredentials()).isInstanceOf(AnonymousCredentials.class);
+ }
+
+ @Test
+ public void verifyGetOwnerIdMethod() {
+ String tenantIdStr = "6f67b6cc-21dd-46c5-809c-402b738a3f8b";
+ String ruleNodeIdStr = "80a90b53-6888-4344-bf46-01ce8e96eee7";
+ RuleNode ruleNode = new RuleNode(new RuleNodeId(UUID.fromString(ruleNodeIdStr)));
+ given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString(tenantIdStr)));
+ given(ctxMock.getSelf()).willReturn(ruleNode);
+
+ String actualOwnerIdStr = node.getOwnerId(ctxMock);
+ String expectedOwnerIdStr = "Tenant[" + tenantIdStr + "]RuleNode[" + ruleNodeIdStr + "]";
+ assertThat(actualOwnerIdStr).isEqualTo(expectedOwnerIdStr);
+ }
+
+ @Test
+ public void verifyPrepareMqttClientConfigMethodWithBasicCredentials() throws SSLException {
+ BasicCredentials credentials = new BasicCredentials();
+ credentials.setUsername("test_username");
+ credentials.setPassword("test_password");
+ config.setCredentials(credentials);
+ ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config);
+ MqttClientConfig mqttClientConfig = new MqttClientConfig(node.getSslContext());
+
+ node.prepareMqttClientConfig(mqttClientConfig);
+
+ assertThat(mqttClientConfig)
+ .hasFieldOrPropertyWithValue("username", "test_username")
+ .hasFieldOrPropertyWithValue("password", "test_password");
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public void verifyGetSslContextMethod(boolean ssl, ClientCredentials credentials, SslContext expectedSslContext) throws SSLException {
+ config.setSsl(ssl);
+ config.setCredentials(credentials);
+ ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config);
+
+ SslContext actualSslContext = node.getSslContext();
+ assertThat(actualSslContext)
+ .usingRecursiveComparison()
+ .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock")
+ .isEqualTo(expectedSslContext);
+ }
+
+ private static Stream verifyGetSslContextMethod() throws SSLException {
+ return Stream.of(
+ Arguments.of(true, new BasicCredentials(), SslContextBuilder.forClient().build()),
+ Arguments.of(false, new AnonymousCredentials(), null)
+ );
+ }
+
+ @Test
+ public void givenFailedToInitializeMqttClient_whenInit_thenThrowsException() throws Exception {
+ String errorMsg = "Failed to connect to MQTT broker!";
+ willThrow(new RuntimeException(errorMsg)).given(node).initClient(any());
+
+ var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
+ assertThatThrownBy(() -> node.init(ctxMock, configuration))
+ .isInstanceOf(TbNodeException.class)
+ .hasMessage(RuntimeException.class.getName() + ": " + errorMsg);
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public void givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) throws Exception {
+ config.setRetainedMessage(true);
+ config.setTopicPattern(topicPattern);
+
+ willReturn(clientMock).given(node).initClient(any());
+ Future future = mock(Future.class);
+ given(future.isSuccess()).willReturn(true);
+ given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future);
+ willAnswer(invocation-> {
+ GenericFutureListener> listener = invocation.getArgument(0);
+ listener.operationComplete(future);
+ return null;
+ }).given(future).addListener(any());
+
+ node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
+ TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
+ node.onMsg(ctxMock, msg);
+
+ String expectedTopic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
+ then(clientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true);
+ then(ctxMock).should().tellSuccess(msg);
+ }
+
+ private static Stream givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess() {
+ return Stream.of(
+ Arguments.of("new-topic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT),
+ Arguments.of("${md-topic-name}", new TbMsgMetaData(Map.of("md-topic-name", "md-new-topic")), TbMsg.EMPTY_JSON_OBJECT),
+ Arguments.of("$[msg-topic-name]", TbMsgMetaData.EMPTY, "{\"msg-topic-name\":\"msg-new-topic\"}")
+ );
+ }
+
+ @Test
+ public void givenParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception {
+ config.setParseToPlainText(true);
+
+ willReturn(clientMock).given(node).initClient(any());
+ Future future = mock(Future.class);
+ given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future);
+ given(future.isSuccess()).willReturn(false);
+ String errorMsg = "Message publishing was failed!";
+ Throwable exception = new RuntimeException(errorMsg);
+ given(future.cause()).willReturn(exception);
+ willAnswer(invocation-> {
+ GenericFutureListener> listener = invocation.getArgument(0);
+ listener.operationComplete(future);
+ return null;
+ }).given(future).addListener(any());
+
+ node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
+ TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "\"string\"");
+ node.onMsg(ctxMock, msg);
+
+ String expectedData = JacksonUtil.toPlainText(msg.getData());
+ then(clientMock).should().publish(config.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false);
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("error", RuntimeException.class + ": " + errorMsg);
+ TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData);
+ ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
+ then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), eq(exception));
+ TbMsg actualMsg = actualMsgCaptor.getValue();
+ assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
}
private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {