From a86a42813ffdebdba7561fdc9b3efc41e6e348a6 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 10 Jun 2024 15:21:43 +0300 Subject: [PATCH] added tests for mqtt node --- .../rule/engine/mqtt/TbMqttNode.java | 5 +- .../engine/mqtt/azure/TbAzureIotHubNode.java | 2 +- .../rule/engine/mqtt/TbMqttNodeTest.java | 202 +++++++++++++++++- 3 files changed, 203 insertions(+), 6 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 5074024be5..ab6fcda8dc 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeoutException; type = ComponentType.EXTERNAL, name = "mqtt", configClazz = TbMqttNodeConfiguration.class, + version = 1, clusteringMode = ComponentClusteringMode.USER_PREFERENCE, nodeDescription = "Publish messages to the MQTT broker", nodeDetails = "Will publish message payload to the MQTT broker with QoS AT_LEAST_ONCE.", @@ -145,7 +146,7 @@ public class TbMqttNode extends TbAbstractExternalNode { return client; } - protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { + protected void prepareMqttClientConfig(MqttClientConfig config) { ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials(); if (credentials.getType() == CredentialsType.BASIC) { BasicCredentials basicCredentials = (BasicCredentials) credentials; @@ -154,7 +155,7 @@ public class TbMqttNode extends TbAbstractExternalNode { } } - private SslContext getSslContext() throws SSLException { + protected SslContext getSslContext() throws SSLException { return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index b78a441134..2cbc172570 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -66,7 +66,7 @@ public class TbAzureIotHubNode extends TbMqttNode { } } - protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { + protected void prepareMqttClientConfig(MqttClientConfig config) { config.setProtocolVersion(MqttVersion.MQTT_3_1_1); config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId())); ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index b46ecdf6fe..045ff3637f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -15,26 +15,222 @@ */ package org.thingsboard.rule.engine.mqtt; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.mqtt.MqttClient; +import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.credentials.AnonymousCredentials; +import org.thingsboard.rule.engine.credentials.BasicCredentials; +import org.thingsboard.rule.engine.credentials.ClientCredentials; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import javax.net.ssl.SSLException; +import java.util.Map; +import java.util.UUID; import java.util.stream.Stream; -import static org.mockito.Mockito.mock; +import static com.amazonaws.util.StringUtils.UTF8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.BDDMockito.willThrow; @ExtendWith(MockitoExtension.class) class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); + @Spy - TbMqttNode node; + private TbMqttNode node; + private TbMqttNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private MqttClient clientMock; @BeforeEach public void setUp() throws Exception { - node = mock(TbMqttNode.class); + node = spy(new TbMqttNode()); + config = new TbMqttNodeConfiguration().defaultConfiguration(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(config.getTopicPattern()).isEqualTo("my-topic"); + assertThat(config.getHost()).isNull(); + assertThat(config.getPort()).isEqualTo(1883); + assertThat(config.getConnectTimeoutSec()).isEqualTo(10); + assertThat(config.getClientId()).isNull(); + assertThat(config.isAppendClientIdSuffix()).isFalse(); + assertThat(config.isRetainedMessage()).isFalse(); + assertThat(config.isCleanSession()).isTrue(); + assertThat(config.isSsl()).isFalse(); + assertThat(config.isParseToPlainText()).isFalse(); + assertThat(config.getCredentials()).isInstanceOf(AnonymousCredentials.class); + } + + @Test + public void verifyGetOwnerIdMethod() { + String tenantIdStr = "6f67b6cc-21dd-46c5-809c-402b738a3f8b"; + String ruleNodeIdStr = "80a90b53-6888-4344-bf46-01ce8e96eee7"; + RuleNode ruleNode = new RuleNode(new RuleNodeId(UUID.fromString(ruleNodeIdStr))); + given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString(tenantIdStr))); + given(ctxMock.getSelf()).willReturn(ruleNode); + + String actualOwnerIdStr = node.getOwnerId(ctxMock); + String expectedOwnerIdStr = "Tenant[" + tenantIdStr + "]RuleNode[" + ruleNodeIdStr + "]"; + assertThat(actualOwnerIdStr).isEqualTo(expectedOwnerIdStr); + } + + @Test + public void verifyPrepareMqttClientConfigMethodWithBasicCredentials() throws SSLException { + BasicCredentials credentials = new BasicCredentials(); + credentials.setUsername("test_username"); + credentials.setPassword("test_password"); + config.setCredentials(credentials); + ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config); + MqttClientConfig mqttClientConfig = new MqttClientConfig(node.getSslContext()); + + node.prepareMqttClientConfig(mqttClientConfig); + + assertThat(mqttClientConfig) + .hasFieldOrPropertyWithValue("username", "test_username") + .hasFieldOrPropertyWithValue("password", "test_password"); + } + + @ParameterizedTest + @MethodSource + public void verifyGetSslContextMethod(boolean ssl, ClientCredentials credentials, SslContext expectedSslContext) throws SSLException { + config.setSsl(ssl); + config.setCredentials(credentials); + ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config); + + SslContext actualSslContext = node.getSslContext(); + assertThat(actualSslContext) + .usingRecursiveComparison() + .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock") + .isEqualTo(expectedSslContext); + } + + private static Stream verifyGetSslContextMethod() throws SSLException { + return Stream.of( + Arguments.of(true, new BasicCredentials(), SslContextBuilder.forClient().build()), + Arguments.of(false, new AnonymousCredentials(), null) + ); + } + + @Test + public void givenFailedToInitializeMqttClient_whenInit_thenThrowsException() throws Exception { + String errorMsg = "Failed to connect to MQTT broker!"; + willThrow(new RuntimeException(errorMsg)).given(node).initClient(any()); + + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage(RuntimeException.class.getName() + ": " + errorMsg); + } + + @ParameterizedTest + @MethodSource + public void givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) throws Exception { + config.setRetainedMessage(true); + config.setTopicPattern(topicPattern); + + willReturn(clientMock).given(node).initClient(any()); + Future future = mock(Future.class); + given(future.isSuccess()).willReturn(true); + given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + willAnswer(invocation-> { + GenericFutureListener> listener = invocation.getArgument(0); + listener.operationComplete(future); + return null; + }).given(future).addListener(any()); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); + node.onMsg(ctxMock, msg); + + String expectedTopic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); + then(clientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true); + then(ctxMock).should().tellSuccess(msg); + } + + private static Stream givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess() { + return Stream.of( + Arguments.of("new-topic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), + Arguments.of("${md-topic-name}", new TbMsgMetaData(Map.of("md-topic-name", "md-new-topic")), TbMsg.EMPTY_JSON_OBJECT), + Arguments.of("$[msg-topic-name]", TbMsgMetaData.EMPTY, "{\"msg-topic-name\":\"msg-new-topic\"}") + ); + } + + @Test + public void givenParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception { + config.setParseToPlainText(true); + + willReturn(clientMock).given(node).initClient(any()); + Future future = mock(Future.class); + given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + given(future.isSuccess()).willReturn(false); + String errorMsg = "Message publishing was failed!"; + Throwable exception = new RuntimeException(errorMsg); + given(future.cause()).willReturn(exception); + willAnswer(invocation-> { + GenericFutureListener> listener = invocation.getArgument(0); + listener.operationComplete(future); + return null; + }).given(future).addListener(any()); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "\"string\""); + node.onMsg(ctxMock, msg); + + String expectedData = JacksonUtil.toPlainText(msg.getData()); + then(clientMock).should().publish(config.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("error", RuntimeException.class + ": " + errorMsg); + TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), eq(exception)); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); } private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {