From a86a42813ffdebdba7561fdc9b3efc41e6e348a6 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 10 Jun 2024 15:21:43 +0300 Subject: [PATCH 1/6] added tests for mqtt node --- .../rule/engine/mqtt/TbMqttNode.java | 5 +- .../engine/mqtt/azure/TbAzureIotHubNode.java | 2 +- .../rule/engine/mqtt/TbMqttNodeTest.java | 202 +++++++++++++++++- 3 files changed, 203 insertions(+), 6 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 5074024be5..ab6fcda8dc 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeoutException; type = ComponentType.EXTERNAL, name = "mqtt", configClazz = TbMqttNodeConfiguration.class, + version = 1, clusteringMode = ComponentClusteringMode.USER_PREFERENCE, nodeDescription = "Publish messages to the MQTT broker", nodeDetails = "Will publish message payload to the MQTT broker with QoS AT_LEAST_ONCE.", @@ -145,7 +146,7 @@ public class TbMqttNode extends TbAbstractExternalNode { return client; } - protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { + protected void prepareMqttClientConfig(MqttClientConfig config) { ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials(); if (credentials.getType() == CredentialsType.BASIC) { BasicCredentials basicCredentials = (BasicCredentials) credentials; @@ -154,7 +155,7 @@ public class TbMqttNode extends TbAbstractExternalNode { } } - private SslContext getSslContext() throws SSLException { + protected SslContext getSslContext() throws SSLException { return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index b78a441134..2cbc172570 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -66,7 +66,7 @@ public class TbAzureIotHubNode extends TbMqttNode { } } - protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { + protected void prepareMqttClientConfig(MqttClientConfig config) { config.setProtocolVersion(MqttVersion.MQTT_3_1_1); config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId())); ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index b46ecdf6fe..045ff3637f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -15,26 +15,222 @@ */ package org.thingsboard.rule.engine.mqtt; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.mqtt.MqttClient; +import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.credentials.AnonymousCredentials; +import org.thingsboard.rule.engine.credentials.BasicCredentials; +import org.thingsboard.rule.engine.credentials.ClientCredentials; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import javax.net.ssl.SSLException; +import java.util.Map; +import java.util.UUID; import java.util.stream.Stream; -import static org.mockito.Mockito.mock; +import static com.amazonaws.util.StringUtils.UTF8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.BDDMockito.willThrow; @ExtendWith(MockitoExtension.class) class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); + @Spy - TbMqttNode node; + private TbMqttNode node; + private TbMqttNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private MqttClient clientMock; @BeforeEach public void setUp() throws Exception { - node = mock(TbMqttNode.class); + node = spy(new TbMqttNode()); + config = new TbMqttNodeConfiguration().defaultConfiguration(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(config.getTopicPattern()).isEqualTo("my-topic"); + assertThat(config.getHost()).isNull(); + assertThat(config.getPort()).isEqualTo(1883); + assertThat(config.getConnectTimeoutSec()).isEqualTo(10); + assertThat(config.getClientId()).isNull(); + assertThat(config.isAppendClientIdSuffix()).isFalse(); + assertThat(config.isRetainedMessage()).isFalse(); + assertThat(config.isCleanSession()).isTrue(); + assertThat(config.isSsl()).isFalse(); + assertThat(config.isParseToPlainText()).isFalse(); + assertThat(config.getCredentials()).isInstanceOf(AnonymousCredentials.class); + } + + @Test + public void verifyGetOwnerIdMethod() { + String tenantIdStr = "6f67b6cc-21dd-46c5-809c-402b738a3f8b"; + String ruleNodeIdStr = "80a90b53-6888-4344-bf46-01ce8e96eee7"; + RuleNode ruleNode = new RuleNode(new RuleNodeId(UUID.fromString(ruleNodeIdStr))); + given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString(tenantIdStr))); + given(ctxMock.getSelf()).willReturn(ruleNode); + + String actualOwnerIdStr = node.getOwnerId(ctxMock); + String expectedOwnerIdStr = "Tenant[" + tenantIdStr + "]RuleNode[" + ruleNodeIdStr + "]"; + assertThat(actualOwnerIdStr).isEqualTo(expectedOwnerIdStr); + } + + @Test + public void verifyPrepareMqttClientConfigMethodWithBasicCredentials() throws SSLException { + BasicCredentials credentials = new BasicCredentials(); + credentials.setUsername("test_username"); + credentials.setPassword("test_password"); + config.setCredentials(credentials); + ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config); + MqttClientConfig mqttClientConfig = new MqttClientConfig(node.getSslContext()); + + node.prepareMqttClientConfig(mqttClientConfig); + + assertThat(mqttClientConfig) + .hasFieldOrPropertyWithValue("username", "test_username") + .hasFieldOrPropertyWithValue("password", "test_password"); + } + + @ParameterizedTest + @MethodSource + public void verifyGetSslContextMethod(boolean ssl, ClientCredentials credentials, SslContext expectedSslContext) throws SSLException { + config.setSsl(ssl); + config.setCredentials(credentials); + ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config); + + SslContext actualSslContext = node.getSslContext(); + assertThat(actualSslContext) + .usingRecursiveComparison() + .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock") + .isEqualTo(expectedSslContext); + } + + private static Stream verifyGetSslContextMethod() throws SSLException { + return Stream.of( + Arguments.of(true, new BasicCredentials(), SslContextBuilder.forClient().build()), + Arguments.of(false, new AnonymousCredentials(), null) + ); + } + + @Test + public void givenFailedToInitializeMqttClient_whenInit_thenThrowsException() throws Exception { + String errorMsg = "Failed to connect to MQTT broker!"; + willThrow(new RuntimeException(errorMsg)).given(node).initClient(any()); + + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage(RuntimeException.class.getName() + ": " + errorMsg); + } + + @ParameterizedTest + @MethodSource + public void givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) throws Exception { + config.setRetainedMessage(true); + config.setTopicPattern(topicPattern); + + willReturn(clientMock).given(node).initClient(any()); + Future future = mock(Future.class); + given(future.isSuccess()).willReturn(true); + given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + willAnswer(invocation-> { + GenericFutureListener> listener = invocation.getArgument(0); + listener.operationComplete(future); + return null; + }).given(future).addListener(any()); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); + node.onMsg(ctxMock, msg); + + String expectedTopic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); + then(clientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true); + then(ctxMock).should().tellSuccess(msg); + } + + private static Stream givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess() { + return Stream.of( + Arguments.of("new-topic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), + Arguments.of("${md-topic-name}", new TbMsgMetaData(Map.of("md-topic-name", "md-new-topic")), TbMsg.EMPTY_JSON_OBJECT), + Arguments.of("$[msg-topic-name]", TbMsgMetaData.EMPTY, "{\"msg-topic-name\":\"msg-new-topic\"}") + ); + } + + @Test + public void givenParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception { + config.setParseToPlainText(true); + + willReturn(clientMock).given(node).initClient(any()); + Future future = mock(Future.class); + given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + given(future.isSuccess()).willReturn(false); + String errorMsg = "Message publishing was failed!"; + Throwable exception = new RuntimeException(errorMsg); + given(future.cause()).willReturn(exception); + willAnswer(invocation-> { + GenericFutureListener> listener = invocation.getArgument(0); + listener.operationComplete(future); + return null; + }).given(future).addListener(any()); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "\"string\""); + node.onMsg(ctxMock, msg); + + String expectedData = JacksonUtil.toPlainText(msg.getData()); + then(clientMock).should().publish(config.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("error", RuntimeException.class + ": " + errorMsg); + TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), eq(exception)); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); } private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { From 4be5fe0cfaf745cf56afa7c1f63663b7313dad7f Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 11 Jun 2024 17:21:24 +0300 Subject: [PATCH 2/6] added tests for azure iot hub node --- .../rule/engine/mqtt/TbMqttNode.java | 2 +- .../engine/mqtt/azure/TbAzureIotHubNode.java | 6 +- .../rule/engine/mqtt/TbMqttNodeTest.java | 94 +++++++-------- .../mqtt/azure/TbAzureIotHubNodeTest.java | 113 ++++++++++++++++++ 4 files changed, 164 insertions(+), 51 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index ab6fcda8dc..ca63c270de 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -115,7 +115,7 @@ public class TbMqttNode extends TbAbstractExternalNode { return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]"; } - protected MqttClient initClient(TbContext ctx) throws Exception { + public MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); config.setOwnerId(getOwnerId(ctx)); if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index 2cbc172570..3376c7d113 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -32,8 +32,6 @@ import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentType; -import javax.net.ssl.SSLException; - @Slf4j @RuleNode( type = ComponentType.EXTERNAL, @@ -74,4 +72,8 @@ public class TbAzureIotHubNode extends TbMqttNode { config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), ((AzureIotHubSasCredentials) credentials).getSasKey())); } } + + protected TbMqttNodeConfiguration getMqttNodeConfiguration() { + return this.mqttNodeConfiguration; + } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index 045ff3637f..1c8a3f2500 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -30,7 +30,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; @@ -73,38 +72,37 @@ import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; @ExtendWith(MockitoExtension.class) -class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { +public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); - @Spy - private TbMqttNode node; - private TbMqttNodeConfiguration config; + protected TbMqttNode mqttNode; + protected TbMqttNodeConfiguration mqttNodeConfig; @Mock - private TbContext ctxMock; + protected TbContext ctxMock; @Mock - private MqttClient clientMock; + protected MqttClient mqttClientMock; @BeforeEach - public void setUp() throws Exception { - node = spy(new TbMqttNode()); - config = new TbMqttNodeConfiguration().defaultConfiguration(); + protected void setUp() { + mqttNode = spy(new TbMqttNode()); + mqttNodeConfig = new TbMqttNodeConfiguration().defaultConfiguration(); } @Test public void verifyDefaultConfig() { - assertThat(config.getTopicPattern()).isEqualTo("my-topic"); - assertThat(config.getHost()).isNull(); - assertThat(config.getPort()).isEqualTo(1883); - assertThat(config.getConnectTimeoutSec()).isEqualTo(10); - assertThat(config.getClientId()).isNull(); - assertThat(config.isAppendClientIdSuffix()).isFalse(); - assertThat(config.isRetainedMessage()).isFalse(); - assertThat(config.isCleanSession()).isTrue(); - assertThat(config.isSsl()).isFalse(); - assertThat(config.isParseToPlainText()).isFalse(); - assertThat(config.getCredentials()).isInstanceOf(AnonymousCredentials.class); + assertThat(mqttNodeConfig.getTopicPattern()).isEqualTo("my-topic"); + assertThat(mqttNodeConfig.getHost()).isNull(); + assertThat(mqttNodeConfig.getPort()).isEqualTo(1883); + assertThat(mqttNodeConfig.getConnectTimeoutSec()).isEqualTo(10); + assertThat(mqttNodeConfig.getClientId()).isNull(); + assertThat(mqttNodeConfig.isAppendClientIdSuffix()).isFalse(); + assertThat(mqttNodeConfig.isRetainedMessage()).isFalse(); + assertThat(mqttNodeConfig.isCleanSession()).isTrue(); + assertThat(mqttNodeConfig.isSsl()).isFalse(); + assertThat(mqttNodeConfig.isParseToPlainText()).isFalse(); + assertThat(mqttNodeConfig.getCredentials()).isInstanceOf(AnonymousCredentials.class); } @Test @@ -115,7 +113,7 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString(tenantIdStr))); given(ctxMock.getSelf()).willReturn(ruleNode); - String actualOwnerIdStr = node.getOwnerId(ctxMock); + String actualOwnerIdStr = mqttNode.getOwnerId(ctxMock); String expectedOwnerIdStr = "Tenant[" + tenantIdStr + "]RuleNode[" + ruleNodeIdStr + "]"; assertThat(actualOwnerIdStr).isEqualTo(expectedOwnerIdStr); } @@ -125,11 +123,11 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { BasicCredentials credentials = new BasicCredentials(); credentials.setUsername("test_username"); credentials.setPassword("test_password"); - config.setCredentials(credentials); - ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config); - MqttClientConfig mqttClientConfig = new MqttClientConfig(node.getSslContext()); + mqttNodeConfig.setCredentials(credentials); + ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); + MqttClientConfig mqttClientConfig = new MqttClientConfig(mqttNode.getSslContext()); - node.prepareMqttClientConfig(mqttClientConfig); + mqttNode.prepareMqttClientConfig(mqttClientConfig); assertThat(mqttClientConfig) .hasFieldOrPropertyWithValue("username", "test_username") @@ -139,11 +137,11 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @ParameterizedTest @MethodSource public void verifyGetSslContextMethod(boolean ssl, ClientCredentials credentials, SslContext expectedSslContext) throws SSLException { - config.setSsl(ssl); - config.setCredentials(credentials); - ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config); + mqttNodeConfig.setSsl(ssl); + mqttNodeConfig.setCredentials(credentials); + ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); - SslContext actualSslContext = node.getSslContext(); + SslContext actualSslContext = mqttNode.getSslContext(); assertThat(actualSslContext) .usingRecursiveComparison() .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock") @@ -160,10 +158,10 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @Test public void givenFailedToInitializeMqttClient_whenInit_thenThrowsException() throws Exception { String errorMsg = "Failed to connect to MQTT broker!"; - willThrow(new RuntimeException(errorMsg)).given(node).initClient(any()); + willThrow(new RuntimeException(errorMsg)).given(mqttNode).initClient(any()); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - assertThatThrownBy(() -> node.init(ctxMock, configuration)) + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)); + assertThatThrownBy(() -> mqttNode.init(ctxMock, configuration)) .isInstanceOf(TbNodeException.class) .hasMessage(RuntimeException.class.getName() + ": " + errorMsg); } @@ -171,25 +169,25 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @ParameterizedTest @MethodSource public void givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) throws Exception { - config.setRetainedMessage(true); - config.setTopicPattern(topicPattern); + mqttNodeConfig.setRetainedMessage(true); + mqttNodeConfig.setTopicPattern(topicPattern); - willReturn(clientMock).given(node).initClient(any()); + willReturn(mqttClientMock).given(mqttNode).initClient(any()); Future future = mock(Future.class); given(future.isSuccess()).willReturn(true); - given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); willAnswer(invocation-> { GenericFutureListener> listener = invocation.getArgument(0); listener.operationComplete(future); return null; }).given(future).addListener(any()); - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); - node.onMsg(ctxMock, msg); + mqttNode.onMsg(ctxMock, msg); - String expectedTopic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); - then(clientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true); + String expectedTopic = TbNodeUtils.processPattern(mqttNodeConfig.getTopicPattern(), msg); + then(mqttClientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true); then(ctxMock).should().tellSuccess(msg); } @@ -203,11 +201,11 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @Test public void givenParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception { - config.setParseToPlainText(true); + mqttNodeConfig.setParseToPlainText(true); - willReturn(clientMock).given(node).initClient(any()); + willReturn(mqttClientMock).given(mqttNode).initClient(any()); Future future = mock(Future.class); - given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); given(future.isSuccess()).willReturn(false); String errorMsg = "Message publishing was failed!"; Throwable exception = new RuntimeException(errorMsg); @@ -218,12 +216,12 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { return null; }).given(future).addListener(any()); - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "\"string\""); - node.onMsg(ctxMock, msg); + mqttNode.onMsg(ctxMock, msg); String expectedData = JacksonUtil.toPlainText(msg.getData()); - then(clientMock).should().publish(config.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false); + then(mqttClientMock).should().publish(mqttNodeConfig.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("error", RuntimeException.class + ": " + errorMsg); TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); @@ -251,6 +249,6 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @Override protected TbNode getTestNode() { - return node; + return mqttNode; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java new file mode 100644 index 0000000000..d62afe3001 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -0,0 +1,113 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.mqtt.azure; + +import io.netty.handler.codec.mqtt.MqttVersion; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.AzureIotHubUtil; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.mqtt.MqttClientConfig; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.credentials.CertPemCredentials; +import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; +import org.thingsboard.rule.engine.mqtt.TbMqttNodeTest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.BDDMockito.willThrow; + +public class TbAzureIotHubNodeTest extends TbMqttNodeTest { + + private TbAzureIotHubNode azureIotHubNode; + private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig; + + @BeforeEach + public void setUp() { + super.setUp(); + azureIotHubNode = spy(new TbAzureIotHubNode()); + azureIotHubNodeConfig = new TbAzureIotHubNodeConfiguration().defaultConfiguration(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(azureIotHubNodeConfig.getTopicPattern()).isEqualTo("devices//messages/events/"); + assertThat(azureIotHubNodeConfig.getHost()).isEqualTo(".azure-devices.net"); + assertThat(azureIotHubNodeConfig.getPort()).isEqualTo(8883); + assertThat(azureIotHubNodeConfig.getConnectTimeoutSec()).isEqualTo(10); + assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue(); + assertThat(azureIotHubNodeConfig.isSsl()).isTrue(); + assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class); + } + + @Test + public void verifyPrepareMqttClientConfigMethodWithAzureIotHubSasCredentials() throws TbNodeException { + AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); + credentials.setSasKey("testSasKey"); + credentials.setCaCert("test-ca-cert.pem"); + azureIotHubNodeConfig.setCredentials(credentials); + TbNodeConfiguration configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)); + mqttNodeConfig = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); + ReflectionTestUtils.setField(azureIotHubNode, "mqttNodeConfiguration", mqttNodeConfig); + MqttClientConfig mqttClientConfig = new MqttClientConfig(); + + azureIotHubNode.prepareMqttClientConfig(mqttClientConfig); + + assertThat(mqttClientConfig) + .hasFieldOrPropertyWithValue("protocolVersion", MqttVersion.MQTT_3_1_1) + .hasFieldOrPropertyWithValue("username", AzureIotHubUtil.buildUsername(mqttNodeConfig.getHost(), mqttClientConfig.getClientId())) + .hasFieldOrPropertyWithValue("password", AzureIotHubUtil.buildSasToken(mqttNodeConfig.getHost(), credentials.getSasKey())); + } + + @Test + public void givenPemCredentialsAndSuccessfulInitClient_whenInit_thenOk() throws Exception { + CertPemCredentials credentials = new CertPemCredentials(); + credentials.setCaCert("test-ca-cert.pem"); + credentials.setPassword("test-password"); + azureIotHubNodeConfig.setCredentials(credentials); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)); + + willReturn(mqttClientMock).given(azureIotHubNode).initClient(any()); + + azureIotHubNode.init(ctxMock, configuration); + + assertThat(azureIotHubNode.getMqttNodeConfiguration()) + .hasFieldOrPropertyWithValue("port", 8883) + .hasFieldOrPropertyWithValue("cleanSession", true); + } + + @Test + public void givenAzureIotHubSasCredentialsAndFailedInitClient_whenInit_thenThrowsException() throws Exception { + AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); + credentials.setSasKey("testSasKey"); + credentials.setCaCert("test-ca-cert.pem"); + azureIotHubNodeConfig.setCredentials(credentials); + + String errorMsg = "Failed to connect to MQTT broker!"; + willThrow(new RuntimeException(errorMsg)).given(azureIotHubNode).initClient(any()); + + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)); + assertThatThrownBy(() -> azureIotHubNode.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage(RuntimeException.class.getName() + ": " + errorMsg); + } +} From 60d638dfd7de74351b2335ef71f46f0e652b297d Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 20 Jun 2024 15:47:29 +0300 Subject: [PATCH 3/6] moved connecting mqtt client no the separate method --- .../rule/engine/mqtt/TbMqttNode.java | 8 +- .../rule/engine/mqtt/TbMqttNodeTest.java | 123 +++++++++++++++--- .../mqtt/azure/TbAzureIotHubNodeTest.java | 65 ++++++--- 3 files changed, 157 insertions(+), 39 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index ca63c270de..d1297b3c2c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -115,7 +115,7 @@ public class TbMqttNode extends TbAbstractExternalNode { return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]"; } - public MqttClient initClient(TbContext ctx) throws Exception { + protected MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); config.setOwnerId(getOwnerId(ctx)); if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { @@ -127,7 +127,7 @@ public class TbMqttNode extends TbAbstractExternalNode { prepareMqttClientConfig(config); MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); client.setEventLoop(ctx.getSharedEventLoop()); - Promise connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); + Promise connectFuture = connectMqttClient(client); MqttConnectResult result; try { result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS); @@ -146,6 +146,10 @@ public class TbMqttNode extends TbAbstractExternalNode { return client; } + protected Promise connectMqttClient(MqttClient client) { + return client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); + } + protected void prepareMqttClientConfig(MqttClientConfig config) { ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials(); if (credentials.getType() == CredentialsType.BASIC) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index 1c8a3f2500..d34d2675d0 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -17,11 +17,14 @@ package org.thingsboard.rule.engine.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -33,9 +36,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -43,11 +49,13 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.credentials.AnonymousCredentials; import org.thingsboard.rule.engine.credentials.BasicCredentials; +import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.credentials.ClientCredentials; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -55,26 +63,34 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import javax.net.ssl.SSLException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import static com.amazonaws.util.StringUtils.UTF8; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.never; import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; -import static org.mockito.BDDMockito.willThrow; @ExtendWith(MockitoExtension.class) public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); + private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("11699e8f-c3f0-4366-9334-cbf75798314b")); + private final ListeningExecutor executor = new TestDbCallbackExecutor(); protected TbMqttNode mqttNode; protected TbMqttNodeConfiguration mqttNodeConfig; @@ -83,6 +99,12 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { protected TbContext ctxMock; @Mock protected MqttClient mqttClientMock; + @Mock + protected EventLoopGroup eventLoopGroupMock; + @Mock + protected Promise promiseMock; + @Mock + protected MqttConnectResult resultMock; @BeforeEach protected void setUp() { @@ -129,9 +151,8 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mqttNode.prepareMqttClientConfig(mqttClientConfig); - assertThat(mqttClientConfig) - .hasFieldOrPropertyWithValue("username", "test_username") - .hasFieldOrPropertyWithValue("password", "test_password"); + assertThat(mqttClientConfig.getUsername()).isEqualTo("test_username"); + assertThat(mqttClientConfig.getPassword()).isEqualTo("test_password"); } @ParameterizedTest @@ -156,23 +177,62 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { } @Test - public void givenFailedToInitializeMqttClient_whenInit_thenThrowsException() throws Exception { - String errorMsg = "Failed to connect to MQTT broker!"; - willThrow(new RuntimeException(errorMsg)).given(mqttNode).initClient(any()); + public void givenSuccessfulConnectResult_whenInit_thenOk() throws ExecutionException, InterruptedException, TimeoutException { + mqttNodeConfig.setClientId("bfrbTESTmfkr23"); + mqttNodeConfig.setAppendClientIdSuffix(true); + mqttNodeConfig.setCredentials(new CertPemCredentials()); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)); - assertThatThrownBy(() -> mqttNode.init(ctxMock, configuration)) + mockConnectClient(mqttNode); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); + given(resultMock.isSuccess()).willReturn(true); + + assertThatNoException().isThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))); + } + + @Test + public void givenFailedByTimeoutConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { + mqttNodeConfig.setHost("localhost"); + mqttNodeConfig.setClientId("bfrbTESTmfkr23"); + mqttNodeConfig.setCredentials(new CertPemCredentials()); + + mockConnectClient(mqttNode); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willThrow(new TimeoutException("Failed to connect")); + + assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) .isInstanceOf(TbNodeException.class) - .hasMessage(RuntimeException.class.getName() + ": " + errorMsg); + .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at localhost:1883.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); + } + + @Test + public void givenFailedConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { + mqttNodeConfig.setHost("localhost"); + mqttNodeConfig.setClientId("bfrbTESTmfkr23"); + mqttNodeConfig.setAppendClientIdSuffix(true); + mqttNodeConfig.setCredentials(new CertPemCredentials()); + + mockConnectClient(mqttNode); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); + given(resultMock.isSuccess()).willReturn(false); + given(resultMock.getReturnCode()).willReturn(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); + + assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at localhost:1883. Result code is: CONNECTION_REFUSED_NOT_AUTHORIZED") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); } @ParameterizedTest @MethodSource - public void givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) throws Exception { + public void givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) { mqttNodeConfig.setRetainedMessage(true); mqttNodeConfig.setTopicPattern(topicPattern); + ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); + ReflectionTestUtils.setField(mqttNode, "mqttClient", mqttClientMock); + ReflectionTestUtils.setField(mqttNode, "forceAck", true); - willReturn(mqttClientMock).given(mqttNode).initClient(any()); Future future = mock(Future.class); given(future.isSuccess()).willReturn(true); given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); @@ -182,16 +242,18 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { return null; }).given(future).addListener(any()); - mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); mqttNode.onMsg(ctxMock, msg); + then(ctxMock).should().ack(msg); String expectedTopic = TbNodeUtils.processPattern(mqttNodeConfig.getTopicPattern(), msg); then(mqttClientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true); - then(ctxMock).should().tellSuccess(msg); + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(msg); } - private static Stream givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess() { + private static Stream givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess() { return Stream.of( Arguments.of("new-topic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), Arguments.of("${md-topic-name}", new TbMsgMetaData(Map.of("md-topic-name", "md-new-topic")), TbMsg.EMPTY_JSON_OBJECT), @@ -200,10 +262,12 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { } @Test - public void givenParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception { + public void givenForceAckIsFalseParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() { mqttNodeConfig.setParseToPlainText(true); + ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); + ReflectionTestUtils.setField(mqttNode, "mqttClient", mqttClientMock); + ReflectionTestUtils.setField(mqttNode, "forceAck", false); - willReturn(mqttClientMock).given(mqttNode).initClient(any()); Future future = mock(Future.class); given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); given(future.isSuccess()).willReturn(false); @@ -216,10 +280,10 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { return null; }).given(future).addListener(any()); - mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "\"string\""); mqttNode.onMsg(ctxMock, msg); + then(ctxMock).should(never()).ack(msg); String expectedData = JacksonUtil.toPlainText(msg.getData()); then(mqttClientMock).should().publish(mqttNodeConfig.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false); TbMsgMetaData metaData = new TbMsgMetaData(); @@ -231,6 +295,20 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); } + @Test + public void givenMqttClientIsNotNull_whenDestroy_thenDisconnect() { + ReflectionTestUtils.setField(mqttNode, "mqttClient", mqttClientMock); + mqttNode.destroy(); + then(mqttClientMock).should().disconnect(); + } + + @Test + public void givenMqttClientIsNull_whenDestroy_thenShouldHaveNoInteractions() { + ReflectionTestUtils.setField(mqttNode, "mqttClient", null); + mqttNode.destroy(); + then(mqttClientMock).shouldHaveNoInteractions(); + } + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { return Stream.of( // default config for version 0 @@ -251,4 +329,13 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { protected TbNode getTestNode() { return mqttNode; } + + protected void mockConnectClient(TbMqttNode node) { + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + given(ctxMock.getSelf()).willReturn(new RuleNode(RULE_NODE_ID)); + given(ctxMock.getExternalCallExecutor()).willReturn(executor); + given(ctxMock.getSharedEventLoop()).willReturn(eventLoopGroupMock); + willReturn(promiseMock).given(node).connectMqttClient(any()); + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java index d62afe3001..7397fcdf9a 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.mqtt.azure; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,12 +30,17 @@ import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; import org.thingsboard.rule.engine.mqtt.TbMqttNodeTest; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.spy; -import static org.mockito.BDDMockito.willReturn; -import static org.mockito.BDDMockito.willThrow; public class TbAzureIotHubNodeTest extends TbMqttNodeTest { @@ -72,42 +78,63 @@ public class TbAzureIotHubNodeTest extends TbMqttNodeTest { azureIotHubNode.prepareMqttClientConfig(mqttClientConfig); - assertThat(mqttClientConfig) - .hasFieldOrPropertyWithValue("protocolVersion", MqttVersion.MQTT_3_1_1) - .hasFieldOrPropertyWithValue("username", AzureIotHubUtil.buildUsername(mqttNodeConfig.getHost(), mqttClientConfig.getClientId())) - .hasFieldOrPropertyWithValue("password", AzureIotHubUtil.buildSasToken(mqttNodeConfig.getHost(), credentials.getSasKey())); + assertThat(mqttClientConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1); + assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(mqttNodeConfig.getHost(), mqttClientConfig.getClientId())); + assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(mqttNodeConfig.getHost(), credentials.getSasKey())); } @Test - public void givenPemCredentialsAndSuccessfulInitClient_whenInit_thenOk() throws Exception { + public void givenPemCredentialsAndSuccessfulConnectResult_whenInit_thenOk() throws Exception { CertPemCredentials credentials = new CertPemCredentials(); credentials.setCaCert("test-ca-cert.pem"); credentials.setPassword("test-password"); azureIotHubNodeConfig.setCredentials(credentials); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)); - willReturn(mqttClientMock).given(azureIotHubNode).initClient(any()); + mockConnectClient(azureIotHubNode); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); + given(resultMock.isSuccess()).willReturn(true); - azureIotHubNode.init(ctxMock, configuration); + assertThatNoException().isThrownBy( + () -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))); - assertThat(azureIotHubNode.getMqttNodeConfiguration()) - .hasFieldOrPropertyWithValue("port", 8883) - .hasFieldOrPropertyWithValue("cleanSession", true); + TbMqttNodeConfiguration mqttNodeConfiguration = azureIotHubNode.getMqttNodeConfiguration(); + assertThat(mqttNodeConfiguration.getPort()).isEqualTo(8883); + assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); } @Test - public void givenAzureIotHubSasCredentialsAndFailedInitClient_whenInit_thenThrowsException() throws Exception { + public void givenAzureIotHubSasCredentialsAndFailedByTimeoutConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); credentials.setSasKey("testSasKey"); credentials.setCaCert("test-ca-cert.pem"); azureIotHubNodeConfig.setCredentials(credentials); - String errorMsg = "Failed to connect to MQTT broker!"; - willThrow(new RuntimeException(errorMsg)).given(azureIotHubNode).initClient(any()); + mockConnectClient(azureIotHubNode); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willThrow(new TimeoutException("Failed to connect")); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)); - assertThatThrownBy(() -> azureIotHubNode.init(ctxMock, configuration)) + assertThatThrownBy(() -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))) .isInstanceOf(TbNodeException.class) - .hasMessage(RuntimeException.class.getName() + ": " + errorMsg); + .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at .azure-devices.net:8883.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); + } + + @Test + public void givenFailedConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { + AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); + credentials.setSasKey("testSasKey"); + credentials.setCaCert("test-ca-cert.pem"); + azureIotHubNodeConfig.setCredentials(credentials); + + mockConnectClient(azureIotHubNode); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); + given(resultMock.isSuccess()).willReturn(false); + given(resultMock.getReturnCode()).willReturn(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); + + assertThatThrownBy(() -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at .azure-devices.net:8883. Result code is: CONNECTION_REFUSED_NOT_AUTHORIZED") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); } } From f56e72d1e1aece9d2d9a6e77db1ce407cec8eb5f Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 29 Jul 2024 17:55:12 +0300 Subject: [PATCH 4/6] reduced use of reflection --- .../rule/engine/mqtt/TbMqttNode.java | 8 +- .../rule/engine/mqtt/TbMqttNodeTest.java | 106 ++++++++++-------- .../mqtt/azure/TbAzureIotHubNodeTest.java | 99 ++++++++-------- 3 files changed, 107 insertions(+), 106 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index d1297b3c2c..d26d6bb4e5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -125,9 +125,9 @@ public class TbMqttNode extends TbAbstractExternalNode { config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); prepareMqttClientConfig(config); - MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); + MqttClient client = getMqttClient(ctx, config); client.setEventLoop(ctx.getSharedEventLoop()); - Promise connectFuture = connectMqttClient(client); + Promise connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); MqttConnectResult result; try { result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS); @@ -146,8 +146,8 @@ public class TbMqttNode extends TbAbstractExternalNode { return client; } - protected Promise connectMqttClient(MqttClient client) { - return client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); + public MqttClient getMqttClient(TbContext ctx, MqttClientConfig config) { + return MqttClient.create(config, null, ctx.getExternalCallExecutor()); } protected void prepareMqttClientConfig(MqttClientConfig config) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index d34d2675d0..e680633360 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -36,12 +36,10 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -50,7 +48,6 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.credentials.AnonymousCredentials; import org.thingsboard.rule.engine.credentials.BasicCredentials; import org.thingsboard.rule.engine.credentials.CertPemCredentials; -import org.thingsboard.rule.engine.credentials.ClientCredentials; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; @@ -60,7 +57,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import javax.net.ssl.SSLException; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -68,12 +65,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; -import static com.amazonaws.util.StringUtils.UTF8; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -90,7 +87,6 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("11699e8f-c3f0-4366-9334-cbf75798314b")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); protected TbMqttNode mqttNode; protected TbMqttNodeConfiguration mqttNodeConfig; @@ -129,62 +125,64 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @Test public void verifyGetOwnerIdMethod() { - String tenantIdStr = "6f67b6cc-21dd-46c5-809c-402b738a3f8b"; - String ruleNodeIdStr = "80a90b53-6888-4344-bf46-01ce8e96eee7"; - RuleNode ruleNode = new RuleNode(new RuleNodeId(UUID.fromString(ruleNodeIdStr))); - given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString(tenantIdStr))); - given(ctxMock.getSelf()).willReturn(ruleNode); + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + given(ctxMock.getSelf()).willReturn(new RuleNode(RULE_NODE_ID)); String actualOwnerIdStr = mqttNode.getOwnerId(ctxMock); - String expectedOwnerIdStr = "Tenant[" + tenantIdStr + "]RuleNode[" + ruleNodeIdStr + "]"; + String expectedOwnerIdStr = "Tenant[" + TENANT_ID.getId() + "]RuleNode[" + RULE_NODE_ID.getId() + "]"; assertThat(actualOwnerIdStr).isEqualTo(expectedOwnerIdStr); } @Test - public void verifyPrepareMqttClientConfigMethodWithBasicCredentials() throws SSLException { + public void verifyPrepareMqttClientConfigMethodWithBasicCredentials() throws Exception { BasicCredentials credentials = new BasicCredentials(); credentials.setUsername("test_username"); credentials.setPassword("test_password"); mqttNodeConfig.setCredentials(credentials); - ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); - MqttClientConfig mqttClientConfig = new MqttClientConfig(mqttNode.getSslContext()); + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); + + MqttClientConfig mqttClientConfig = new MqttClientConfig(mqttNode.getSslContext()); mqttNode.prepareMqttClientConfig(mqttClientConfig); assertThat(mqttClientConfig.getUsername()).isEqualTo("test_username"); assertThat(mqttClientConfig.getPassword()).isEqualTo("test_password"); } - @ParameterizedTest - @MethodSource - public void verifyGetSslContextMethod(boolean ssl, ClientCredentials credentials, SslContext expectedSslContext) throws SSLException { - mqttNodeConfig.setSsl(ssl); - mqttNodeConfig.setCredentials(credentials); - ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); + @Test + public void givenSslIsTrueAndCredentials_whenGetSslContext_thenVerifySslContext() throws Exception { + mqttNodeConfig.setSsl(true); + mqttNodeConfig.setCredentials(new BasicCredentials()); + + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); SslContext actualSslContext = mqttNode.getSslContext(); assertThat(actualSslContext) .usingRecursiveComparison() .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock") - .isEqualTo(expectedSslContext); - } - - private static Stream verifyGetSslContextMethod() throws SSLException { - return Stream.of( - Arguments.of(true, new BasicCredentials(), SslContextBuilder.forClient().build()), - Arguments.of(false, new AnonymousCredentials(), null) - ); + .isEqualTo(SslContextBuilder.forClient().build()); } @Test - public void givenSuccessfulConnectResult_whenInit_thenOk() throws ExecutionException, InterruptedException, TimeoutException { + public void givenSslIsFalse_whenGetSslContext_thenVerifySslContextIsNull() throws Exception { + mqttNodeConfig.setSsl(false); + + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); + + SslContext actualSslContext = mqttNode.getSslContext(); + assertThat(actualSslContext).isNull(); + } + + @Test + public void givenSuccessfulConnectResult_whenInit_thenOk() throws Exception { mqttNodeConfig.setClientId("bfrbTESTmfkr23"); mqttNodeConfig.setAppendClientIdSuffix(true); mqttNodeConfig.setCredentials(new CertPemCredentials()); - mockConnectClient(mqttNode); - given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); - given(resultMock.isSuccess()).willReturn(true); + mockSuccessfulInit(); assertThatNoException().isThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))); } @@ -195,7 +193,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mqttNodeConfig.setClientId("bfrbTESTmfkr23"); mqttNodeConfig.setCredentials(new CertPemCredentials()); - mockConnectClient(mqttNode); + mockConnectClient(); given(promiseMock.get(anyLong(), any(TimeUnit.class))).willThrow(new TimeoutException("Failed to connect")); assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) @@ -206,13 +204,13 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { } @Test - public void givenFailedConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { + public void givenFailedConnectResult_whenInit_thenThrowsException() throws Exception { mqttNodeConfig.setHost("localhost"); mqttNodeConfig.setClientId("bfrbTESTmfkr23"); mqttNodeConfig.setAppendClientIdSuffix(true); mqttNodeConfig.setCredentials(new CertPemCredentials()); - mockConnectClient(mqttNode); + mockConnectClient(); given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); given(resultMock.isSuccess()).willReturn(false); given(resultMock.getReturnCode()).willReturn(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); @@ -226,12 +224,15 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @ParameterizedTest @MethodSource - public void givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) { + public void givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess( + String topicPattern, TbMsgMetaData metaData, String data + ) throws Exception { mqttNodeConfig.setRetainedMessage(true); mqttNodeConfig.setTopicPattern(topicPattern); - ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); - ReflectionTestUtils.setField(mqttNode, "mqttClient", mqttClientMock); - ReflectionTestUtils.setField(mqttNode, "forceAck", true); + + given(ctxMock.isExternalNodeForceAck()).willReturn(true); + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); Future future = mock(Future.class); given(future.isSuccess()).willReturn(true); @@ -247,7 +248,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { then(ctxMock).should().ack(msg); String expectedTopic = TbNodeUtils.processPattern(mqttNodeConfig.getTopicPattern(), msg); - then(mqttClientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true); + then(mqttClientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(StandardCharsets.UTF_8)), MqttQoS.AT_LEAST_ONCE, true); ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(msg); @@ -262,11 +263,12 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { } @Test - public void givenForceAckIsFalseParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() { + public void givenForceAckIsFalseParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception { mqttNodeConfig.setParseToPlainText(true); - ReflectionTestUtils.setField(mqttNode, "mqttNodeConfiguration", mqttNodeConfig); - ReflectionTestUtils.setField(mqttNode, "mqttClient", mqttClientMock); - ReflectionTestUtils.setField(mqttNode, "forceAck", false); + + given(ctxMock.isExternalNodeForceAck()).willReturn(false); + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); Future future = mock(Future.class); given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); @@ -285,7 +287,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { then(ctxMock).should(never()).ack(msg); String expectedData = JacksonUtil.toPlainText(msg.getData()); - then(mqttClientMock).should().publish(mqttNodeConfig.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false); + then(mqttClientMock).should().publish(mqttNodeConfig.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(StandardCharsets.UTF_8)), MqttQoS.AT_LEAST_ONCE, false); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("error", RuntimeException.class + ": " + errorMsg); TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); @@ -330,12 +332,18 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { return mqttNode; } - protected void mockConnectClient(TbMqttNode node) { + private void mockConnectClient() { given(ctxMock.getTenantId()).willReturn(TENANT_ID); given(ctxMock.getSelf()).willReturn(new RuleNode(RULE_NODE_ID)); - given(ctxMock.getExternalCallExecutor()).willReturn(executor); given(ctxMock.getSharedEventLoop()).willReturn(eventLoopGroupMock); - willReturn(promiseMock).given(node).connectMqttClient(any()); + willReturn(mqttClientMock).given(mqttNode).getMqttClient(any(), any()); + given(mqttClientMock.connect(any(), anyInt())).willReturn(promiseMock); + } + + private void mockSuccessfulInit() throws Exception { + mockConnectClient(); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); + given(resultMock.isSuccess()).willReturn(true); } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java index 7397fcdf9a..f6f8adeafb 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -15,41 +15,58 @@ */ package org.thingsboard.rule.engine.mqtt.azure; -import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.channel.EventLoopGroup; import io.netty.handler.codec.mqtt.MqttVersion; +import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.test.util.ReflectionTestUtils; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.AzureIotHubUtil; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttConnectResult; +import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; -import org.thingsboard.rule.engine.mqtt.TbMqttNodeTest; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rule.RuleNode; -import java.util.concurrent.ExecutionException; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.willReturn; -public class TbAzureIotHubNodeTest extends TbMqttNodeTest { +@ExtendWith(MockitoExtension.class) +public class TbAzureIotHubNodeTest { private TbAzureIotHubNode azureIotHubNode; private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig; + @Mock + protected TbContext ctxMock; + @Mock + protected MqttClient mqttClientMock; + @Mock + protected EventLoopGroup eventLoopGroupMock; + @Mock + protected Promise promiseMock; + @Mock + protected MqttConnectResult resultMock; + @BeforeEach public void setUp() { - super.setUp(); azureIotHubNode = spy(new TbAzureIotHubNode()); azureIotHubNodeConfig = new TbAzureIotHubNodeConfiguration().defaultConfiguration(); } @@ -60,27 +77,31 @@ public class TbAzureIotHubNodeTest extends TbMqttNodeTest { assertThat(azureIotHubNodeConfig.getHost()).isEqualTo(".azure-devices.net"); assertThat(azureIotHubNodeConfig.getPort()).isEqualTo(8883); assertThat(azureIotHubNodeConfig.getConnectTimeoutSec()).isEqualTo(10); + assertThat(azureIotHubNodeConfig.getClientId()).isNull(); + assertThat(azureIotHubNodeConfig.isAppendClientIdSuffix()).isFalse(); + assertThat(azureIotHubNodeConfig.isRetainedMessage()).isFalse(); assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue(); assertThat(azureIotHubNodeConfig.isSsl()).isTrue(); + assertThat(azureIotHubNodeConfig.isParseToPlainText()).isFalse(); assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class); } @Test - public void verifyPrepareMqttClientConfigMethodWithAzureIotHubSasCredentials() throws TbNodeException { + public void verifyPrepareMqttClientConfigMethodWithAzureIotHubSasCredentials() throws Exception { AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); credentials.setSasKey("testSasKey"); credentials.setCaCert("test-ca-cert.pem"); azureIotHubNodeConfig.setCredentials(credentials); - TbNodeConfiguration configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)); - mqttNodeConfig = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); - ReflectionTestUtils.setField(azureIotHubNode, "mqttNodeConfiguration", mqttNodeConfig); - MqttClientConfig mqttClientConfig = new MqttClientConfig(); + mockSuccessfulInit(); + azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig))); + + MqttClientConfig mqttClientConfig = new MqttClientConfig(); azureIotHubNode.prepareMqttClientConfig(mqttClientConfig); assertThat(mqttClientConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1); - assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(mqttNodeConfig.getHost(), mqttClientConfig.getClientId())); - assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(mqttNodeConfig.getHost(), credentials.getSasKey())); + assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(azureIotHubNodeConfig.getHost(), mqttClientConfig.getClientId())); + assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(azureIotHubNodeConfig.getHost(), credentials.getSasKey())); } @Test @@ -90,9 +111,7 @@ public class TbAzureIotHubNodeTest extends TbMqttNodeTest { credentials.setPassword("test-password"); azureIotHubNodeConfig.setCredentials(credentials); - mockConnectClient(azureIotHubNode); - given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); - given(resultMock.isSuccess()).willReturn(true); + mockSuccessfulInit(); assertThatNoException().isThrownBy( () -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))); @@ -102,39 +121,13 @@ public class TbAzureIotHubNodeTest extends TbMqttNodeTest { assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); } - @Test - public void givenAzureIotHubSasCredentialsAndFailedByTimeoutConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { - AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); - credentials.setSasKey("testSasKey"); - credentials.setCaCert("test-ca-cert.pem"); - azureIotHubNodeConfig.setCredentials(credentials); - - mockConnectClient(azureIotHubNode); - given(promiseMock.get(anyLong(), any(TimeUnit.class))).willThrow(new TimeoutException("Failed to connect")); - - assertThatThrownBy(() -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))) - .isInstanceOf(TbNodeException.class) - .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at .azure-devices.net:8883.") - .extracting(e -> ((TbNodeException) e).isUnrecoverable()) - .isEqualTo(false); - } - - @Test - public void givenFailedConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { - AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); - credentials.setSasKey("testSasKey"); - credentials.setCaCert("test-ca-cert.pem"); - azureIotHubNodeConfig.setCredentials(credentials); - - mockConnectClient(azureIotHubNode); + private void mockSuccessfulInit() throws Exception { + given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString("74aad2a5-3c07-43fb-9c9b-07fafb4e86ce"))); + given(ctxMock.getSelf()).willReturn(new RuleNode(new RuleNodeId(UUID.fromString("da5eb2ef-4ea7-4ac9-9359-0e727a0c30ce")))); + given(ctxMock.getSharedEventLoop()).willReturn(eventLoopGroupMock); + willReturn(mqttClientMock).given(azureIotHubNode).getMqttClient(any(), any()); + given(mqttClientMock.connect(any(), anyInt())).willReturn(promiseMock); given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); - given(resultMock.isSuccess()).willReturn(false); - given(resultMock.getReturnCode()).willReturn(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); - - assertThatThrownBy(() -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))) - .isInstanceOf(TbNodeException.class) - .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at .azure-devices.net:8883. Result code is: CONNECTION_REFUSED_NOT_AUTHORIZED") - .extracting(e -> ((TbNodeException) e).isUnrecoverable()) - .isEqualTo(false); + given(resultMock.isSuccess()).willReturn(true); } } From ed319dd5e3bc6a053d05ebcfbdbed2604ef3afed Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 6 Aug 2024 18:24:40 +0300 Subject: [PATCH 5/6] changed access modifiers --- .../rule/engine/mqtt/TbMqttNode.java | 4 +-- .../engine/mqtt/azure/TbAzureIotHubNode.java | 9 ++++-- .../rule/engine/mqtt/TbMqttNodeTest.java | 16 ++++++---- .../mqtt/azure/TbAzureIotHubNodeTest.java | 31 ++----------------- 4 files changed, 21 insertions(+), 39 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index d26d6bb4e5..c909a44a27 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -146,7 +146,7 @@ public class TbMqttNode extends TbAbstractExternalNode { return client; } - public MqttClient getMqttClient(TbContext ctx, MqttClientConfig config) { + MqttClient getMqttClient(TbContext ctx, MqttClientConfig config) { return MqttClient.create(config, null, ctx.getExternalCallExecutor()); } @@ -159,7 +159,7 @@ public class TbMqttNode extends TbAbstractExternalNode { } } - protected SslContext getSslContext() throws SSLException { + private SslContext getSslContext() throws SSLException { return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index 3376c7d113..41d2ac4b8c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.mqtt.azure; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.AzureIotHubUtil; +import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -58,7 +59,7 @@ public class TbAzureIotHubNode extends TbMqttNode { pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert()); } } - this.mqttClient = initClient(ctx); + this.mqttClient = initAzureClient(ctx); } catch (Exception e) { throw new TbNodeException(e); } @@ -73,7 +74,11 @@ public class TbAzureIotHubNode extends TbMqttNode { } } - protected TbMqttNodeConfiguration getMqttNodeConfiguration() { + TbMqttNodeConfiguration getMqttNodeConfiguration() { return this.mqttNodeConfiguration; } + + MqttClient initAzureClient(TbContext ctx) throws Exception { + return initClient(ctx); + } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index e680633360..da568eefbb 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -84,7 +84,7 @@ import static org.mockito.BDDMockito.willReturn; @ExtendWith(MockitoExtension.class) public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { - private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f")); + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("11699e8f-c3f0-4366-9334-cbf75798314b")); @@ -143,7 +143,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mockSuccessfulInit(); mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); - MqttClientConfig mqttClientConfig = new MqttClientConfig(mqttNode.getSslContext()); + MqttClientConfig mqttClientConfig = new MqttClientConfig(); mqttNode.prepareMqttClientConfig(mqttClientConfig); assertThat(mqttClientConfig.getUsername()).isEqualTo("test_username"); @@ -158,7 +158,9 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mockSuccessfulInit(); mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); - SslContext actualSslContext = mqttNode.getSslContext(); + ArgumentCaptor mqttClientConfig = ArgumentCaptor.forClass(MqttClientConfig.class); + then(mqttNode).should().prepareMqttClientConfig(mqttClientConfig.capture()); + SslContext actualSslContext = mqttClientConfig.getValue().getSslContext(); assertThat(actualSslContext) .usingRecursiveComparison() .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock") @@ -172,7 +174,9 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mockSuccessfulInit(); mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); - SslContext actualSslContext = mqttNode.getSslContext(); + ArgumentCaptor mqttClientConfig = ArgumentCaptor.forClass(MqttClientConfig.class); + then(mqttNode).should().prepareMqttClientConfig(mqttClientConfig.capture()); + SslContext actualSslContext = mqttClientConfig.getValue().getSslContext(); assertThat(actualSslContext).isNull(); } @@ -237,7 +241,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { Future future = mock(Future.class); given(future.isSuccess()).willReturn(true); given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); - willAnswer(invocation-> { + willAnswer(invocation -> { GenericFutureListener> listener = invocation.getArgument(0); listener.operationComplete(future); return null; @@ -276,7 +280,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { String errorMsg = "Message publishing was failed!"; Throwable exception = new RuntimeException(errorMsg); given(future.cause()).willReturn(exception); - willAnswer(invocation-> { + willAnswer(invocation -> { GenericFutureListener> listener = invocation.getArgument(0); listener.operationComplete(future); return null; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java index f6f8adeafb..cc0668e5c0 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -15,9 +15,7 @@ */ package org.thingsboard.rule.engine.mqtt.azure; -import io.netty.channel.EventLoopGroup; import io.netty.handler.codec.mqtt.MqttVersion; -import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -27,24 +25,14 @@ import org.thingsboard.common.util.AzureIotHubUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; -import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; -import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.rule.RuleNode; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.willReturn; @@ -58,12 +46,6 @@ public class TbAzureIotHubNodeTest { protected TbContext ctxMock; @Mock protected MqttClient mqttClientMock; - @Mock - protected EventLoopGroup eventLoopGroupMock; - @Mock - protected Promise promiseMock; - @Mock - protected MqttConnectResult resultMock; @BeforeEach public void setUp() { @@ -93,7 +75,7 @@ public class TbAzureIotHubNodeTest { credentials.setCaCert("test-ca-cert.pem"); azureIotHubNodeConfig.setCredentials(credentials); - mockSuccessfulInit(); + willReturn(mqttClientMock).given(azureIotHubNode).initAzureClient(any()); azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig))); MqttClientConfig mqttClientConfig = new MqttClientConfig(); @@ -111,7 +93,7 @@ public class TbAzureIotHubNodeTest { credentials.setPassword("test-password"); azureIotHubNodeConfig.setCredentials(credentials); - mockSuccessfulInit(); + willReturn(mqttClientMock).given(azureIotHubNode).initAzureClient(any()); assertThatNoException().isThrownBy( () -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))); @@ -121,13 +103,4 @@ public class TbAzureIotHubNodeTest { assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); } - private void mockSuccessfulInit() throws Exception { - given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString("74aad2a5-3c07-43fb-9c9b-07fafb4e86ce"))); - given(ctxMock.getSelf()).willReturn(new RuleNode(new RuleNodeId(UUID.fromString("da5eb2ef-4ea7-4ac9-9359-0e727a0c30ce")))); - given(ctxMock.getSharedEventLoop()).willReturn(eventLoopGroupMock); - willReturn(mqttClientMock).given(azureIotHubNode).getMqttClient(any(), any()); - given(mqttClientMock.connect(any(), anyInt())).willReturn(promiseMock); - given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); - given(resultMock.isSuccess()).willReturn(true); - } } From 80a53a840cf0702b7dc27e13082ad1fbb7a5f5b8 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 7 Aug 2024 12:20:43 +0300 Subject: [PATCH 6/6] removed getMqttConfig method --- .../thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java | 4 ---- .../rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java | 4 +++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index 41d2ac4b8c..cbf431d63a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -74,10 +74,6 @@ public class TbAzureIotHubNode extends TbMqttNode { } } - TbMqttNodeConfiguration getMqttNodeConfiguration() { - return this.mqttNodeConfiguration; - } - MqttClient initAzureClient(TbContext ctx) throws Exception { return initClient(ctx); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java index cc0668e5c0..819260b35a 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.AzureIotHubUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.mqtt.MqttClient; @@ -98,7 +99,8 @@ public class TbAzureIotHubNodeTest { assertThatNoException().isThrownBy( () -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))); - TbMqttNodeConfiguration mqttNodeConfiguration = azureIotHubNode.getMqttNodeConfiguration(); + var mqttNodeConfiguration = (TbMqttNodeConfiguration) ReflectionTestUtils.getField(azureIotHubNode, "mqttNodeConfiguration"); + assertThat(mqttNodeConfiguration).isNotNull(); assertThat(mqttNodeConfiguration.getPort()).isEqualTo(8883); assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); }