diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 2baf26206f..c909a44a27 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -125,7 +125,7 @@ public class TbMqttNode extends TbAbstractExternalNode { config.setCleanSession(this.mqttNodeConfiguration.isCleanSession()); prepareMqttClientConfig(config); - MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); + MqttClient client = getMqttClient(ctx, config); client.setEventLoop(ctx.getSharedEventLoop()); Promise connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); MqttConnectResult result; @@ -146,7 +146,11 @@ public class TbMqttNode extends TbAbstractExternalNode { return client; } - protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { + MqttClient getMqttClient(TbContext ctx, MqttClientConfig config) { + return MqttClient.create(config, null, ctx.getExternalCallExecutor()); + } + + protected void prepareMqttClientConfig(MqttClientConfig config) { ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials(); if (credentials.getType() == CredentialsType.BASIC) { BasicCredentials basicCredentials = (BasicCredentials) credentials; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index b78a441134..cbf431d63a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.mqtt.azure; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.AzureIotHubUtil; +import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -32,8 +33,6 @@ import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentType; -import javax.net.ssl.SSLException; - @Slf4j @RuleNode( type = ComponentType.EXTERNAL, @@ -60,13 +59,13 @@ public class TbAzureIotHubNode extends TbMqttNode { pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert()); } } - this.mqttClient = initClient(ctx); + this.mqttClient = initAzureClient(ctx); } catch (Exception e) { throw new TbNodeException(e); } } - protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException { + protected void prepareMqttClientConfig(MqttClientConfig config) { config.setProtocolVersion(MqttVersion.MQTT_3_1_1); config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId())); ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); @@ -74,4 +73,8 @@ public class TbAzureIotHubNode extends TbMqttNode { config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), ((AzureIotHubSasCredentials) credentials).getSasKey())); } } + + MqttClient initAzureClient(TbContext ctx) throws Exception { + return initClient(ctx); + } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index b46ecdf6fe..da568eefbb 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -15,26 +15,304 @@ */ package org.thingsboard.rule.engine.mqtt; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.mockito.Spy; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.mqtt.MqttClient; +import org.thingsboard.mqtt.MqttClientConfig; +import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.credentials.AnonymousCredentials; +import org.thingsboard.rule.engine.credentials.BasicCredentials; +import org.thingsboard.rule.engine.credentials.CertPemCredentials; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; -import static org.mockito.Mockito.mock; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; @ExtendWith(MockitoExtension.class) -class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { - @Spy - TbMqttNode node; +public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { + + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f")); + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); + private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("11699e8f-c3f0-4366-9334-cbf75798314b")); + + protected TbMqttNode mqttNode; + protected TbMqttNodeConfiguration mqttNodeConfig; + + @Mock + protected TbContext ctxMock; + @Mock + protected MqttClient mqttClientMock; + @Mock + protected EventLoopGroup eventLoopGroupMock; + @Mock + protected Promise promiseMock; + @Mock + protected MqttConnectResult resultMock; @BeforeEach - public void setUp() throws Exception { - node = mock(TbMqttNode.class); + protected void setUp() { + mqttNode = spy(new TbMqttNode()); + mqttNodeConfig = new TbMqttNodeConfiguration().defaultConfiguration(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(mqttNodeConfig.getTopicPattern()).isEqualTo("my-topic"); + assertThat(mqttNodeConfig.getHost()).isNull(); + assertThat(mqttNodeConfig.getPort()).isEqualTo(1883); + assertThat(mqttNodeConfig.getConnectTimeoutSec()).isEqualTo(10); + assertThat(mqttNodeConfig.getClientId()).isNull(); + assertThat(mqttNodeConfig.isAppendClientIdSuffix()).isFalse(); + assertThat(mqttNodeConfig.isRetainedMessage()).isFalse(); + assertThat(mqttNodeConfig.isCleanSession()).isTrue(); + assertThat(mqttNodeConfig.isSsl()).isFalse(); + assertThat(mqttNodeConfig.isParseToPlainText()).isFalse(); + assertThat(mqttNodeConfig.getCredentials()).isInstanceOf(AnonymousCredentials.class); + } + + @Test + public void verifyGetOwnerIdMethod() { + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + given(ctxMock.getSelf()).willReturn(new RuleNode(RULE_NODE_ID)); + + String actualOwnerIdStr = mqttNode.getOwnerId(ctxMock); + String expectedOwnerIdStr = "Tenant[" + TENANT_ID.getId() + "]RuleNode[" + RULE_NODE_ID.getId() + "]"; + assertThat(actualOwnerIdStr).isEqualTo(expectedOwnerIdStr); + } + + @Test + public void verifyPrepareMqttClientConfigMethodWithBasicCredentials() throws Exception { + BasicCredentials credentials = new BasicCredentials(); + credentials.setUsername("test_username"); + credentials.setPassword("test_password"); + mqttNodeConfig.setCredentials(credentials); + + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); + + MqttClientConfig mqttClientConfig = new MqttClientConfig(); + mqttNode.prepareMqttClientConfig(mqttClientConfig); + + assertThat(mqttClientConfig.getUsername()).isEqualTo("test_username"); + assertThat(mqttClientConfig.getPassword()).isEqualTo("test_password"); + } + + @Test + public void givenSslIsTrueAndCredentials_whenGetSslContext_thenVerifySslContext() throws Exception { + mqttNodeConfig.setSsl(true); + mqttNodeConfig.setCredentials(new BasicCredentials()); + + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); + + ArgumentCaptor mqttClientConfig = ArgumentCaptor.forClass(MqttClientConfig.class); + then(mqttNode).should().prepareMqttClientConfig(mqttClientConfig.capture()); + SslContext actualSslContext = mqttClientConfig.getValue().getSslContext(); + assertThat(actualSslContext) + .usingRecursiveComparison() + .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock") + .isEqualTo(SslContextBuilder.forClient().build()); + } + + @Test + public void givenSslIsFalse_whenGetSslContext_thenVerifySslContextIsNull() throws Exception { + mqttNodeConfig.setSsl(false); + + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); + + ArgumentCaptor mqttClientConfig = ArgumentCaptor.forClass(MqttClientConfig.class); + then(mqttNode).should().prepareMqttClientConfig(mqttClientConfig.capture()); + SslContext actualSslContext = mqttClientConfig.getValue().getSslContext(); + assertThat(actualSslContext).isNull(); + } + + @Test + public void givenSuccessfulConnectResult_whenInit_thenOk() throws Exception { + mqttNodeConfig.setClientId("bfrbTESTmfkr23"); + mqttNodeConfig.setAppendClientIdSuffix(true); + mqttNodeConfig.setCredentials(new CertPemCredentials()); + + mockSuccessfulInit(); + + assertThatNoException().isThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))); + } + + @Test + public void givenFailedByTimeoutConnectResult_whenInit_thenThrowsException() throws ExecutionException, InterruptedException, TimeoutException { + mqttNodeConfig.setHost("localhost"); + mqttNodeConfig.setClientId("bfrbTESTmfkr23"); + mqttNodeConfig.setCredentials(new CertPemCredentials()); + + mockConnectClient(); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willThrow(new TimeoutException("Failed to connect")); + + assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at localhost:1883.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); + } + + @Test + public void givenFailedConnectResult_whenInit_thenThrowsException() throws Exception { + mqttNodeConfig.setHost("localhost"); + mqttNodeConfig.setClientId("bfrbTESTmfkr23"); + mqttNodeConfig.setAppendClientIdSuffix(true); + mqttNodeConfig.setCredentials(new CertPemCredentials()); + + mockConnectClient(); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); + given(resultMock.isSuccess()).willReturn(false); + given(resultMock.getReturnCode()).willReturn(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); + + assertThatThrownBy(() -> mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig)))) + .isInstanceOf(TbNodeException.class) + .hasMessage("java.lang.RuntimeException: Failed to connect to MQTT broker at localhost:1883. Result code is: CONNECTION_REFUSED_NOT_AUTHORIZED") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); + } + + @ParameterizedTest + @MethodSource + public void givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess( + String topicPattern, TbMsgMetaData metaData, String data + ) throws Exception { + mqttNodeConfig.setRetainedMessage(true); + mqttNodeConfig.setTopicPattern(topicPattern); + + given(ctxMock.isExternalNodeForceAck()).willReturn(true); + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); + + Future future = mock(Future.class); + given(future.isSuccess()).willReturn(true); + given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + willAnswer(invocation -> { + GenericFutureListener> listener = invocation.getArgument(0); + listener.operationComplete(future); + return null; + }).given(future).addListener(any()); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); + mqttNode.onMsg(ctxMock, msg); + + then(ctxMock).should().ack(msg); + String expectedTopic = TbNodeUtils.processPattern(mqttNodeConfig.getTopicPattern(), msg); + then(mqttClientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(StandardCharsets.UTF_8)), MqttQoS.AT_LEAST_ONCE, true); + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(msg); + } + + private static Stream givenForceAckIsTrueAndTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess() { + return Stream.of( + Arguments.of("new-topic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), + Arguments.of("${md-topic-name}", new TbMsgMetaData(Map.of("md-topic-name", "md-new-topic")), TbMsg.EMPTY_JSON_OBJECT), + Arguments.of("$[msg-topic-name]", TbMsgMetaData.EMPTY, "{\"msg-topic-name\":\"msg-new-topic\"}") + ); + } + + @Test + public void givenForceAckIsFalseParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception { + mqttNodeConfig.setParseToPlainText(true); + + given(ctxMock.isExternalNodeForceAck()).willReturn(false); + mockSuccessfulInit(); + mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); + + Future future = mock(Future.class); + given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); + given(future.isSuccess()).willReturn(false); + String errorMsg = "Message publishing was failed!"; + Throwable exception = new RuntimeException(errorMsg); + given(future.cause()).willReturn(exception); + willAnswer(invocation -> { + GenericFutureListener> listener = invocation.getArgument(0); + listener.operationComplete(future); + return null; + }).given(future).addListener(any()); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "\"string\""); + mqttNode.onMsg(ctxMock, msg); + + then(ctxMock).should(never()).ack(msg); + String expectedData = JacksonUtil.toPlainText(msg.getData()); + then(mqttClientMock).should().publish(mqttNodeConfig.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(StandardCharsets.UTF_8)), MqttQoS.AT_LEAST_ONCE, false); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("error", RuntimeException.class + ": " + errorMsg); + TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData); + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), eq(exception)); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); + } + + @Test + public void givenMqttClientIsNotNull_whenDestroy_thenDisconnect() { + ReflectionTestUtils.setField(mqttNode, "mqttClient", mqttClientMock); + mqttNode.destroy(); + then(mqttClientMock).should().disconnect(); + } + + @Test + public void givenMqttClientIsNull_whenDestroy_thenShouldHaveNoInteractions() { + ReflectionTestUtils.setField(mqttNode, "mqttClient", null); + mqttNode.destroy(); + then(mqttClientMock).shouldHaveNoInteractions(); } private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { @@ -55,6 +333,21 @@ class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { @Override protected TbNode getTestNode() { - return node; + return mqttNode; } + + private void mockConnectClient() { + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + given(ctxMock.getSelf()).willReturn(new RuleNode(RULE_NODE_ID)); + given(ctxMock.getSharedEventLoop()).willReturn(eventLoopGroupMock); + willReturn(mqttClientMock).given(mqttNode).getMqttClient(any(), any()); + given(mqttClientMock.connect(any(), anyInt())).willReturn(promiseMock); + } + + private void mockSuccessfulInit() throws Exception { + mockConnectClient(); + given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); + given(resultMock.isSuccess()).willReturn(true); + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java new file mode 100644 index 0000000000..819260b35a --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -0,0 +1,108 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.mqtt.azure; + +import io.netty.handler.codec.mqtt.MqttVersion; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.AzureIotHubUtil; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.mqtt.MqttClient; +import org.thingsboard.mqtt.MqttClientConfig; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.credentials.CertPemCredentials; +import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.willReturn; + +@ExtendWith(MockitoExtension.class) +public class TbAzureIotHubNodeTest { + + private TbAzureIotHubNode azureIotHubNode; + private TbAzureIotHubNodeConfiguration azureIotHubNodeConfig; + + @Mock + protected TbContext ctxMock; + @Mock + protected MqttClient mqttClientMock; + + @BeforeEach + public void setUp() { + azureIotHubNode = spy(new TbAzureIotHubNode()); + azureIotHubNodeConfig = new TbAzureIotHubNodeConfiguration().defaultConfiguration(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(azureIotHubNodeConfig.getTopicPattern()).isEqualTo("devices//messages/events/"); + assertThat(azureIotHubNodeConfig.getHost()).isEqualTo(".azure-devices.net"); + assertThat(azureIotHubNodeConfig.getPort()).isEqualTo(8883); + assertThat(azureIotHubNodeConfig.getConnectTimeoutSec()).isEqualTo(10); + assertThat(azureIotHubNodeConfig.getClientId()).isNull(); + assertThat(azureIotHubNodeConfig.isAppendClientIdSuffix()).isFalse(); + assertThat(azureIotHubNodeConfig.isRetainedMessage()).isFalse(); + assertThat(azureIotHubNodeConfig.isCleanSession()).isTrue(); + assertThat(azureIotHubNodeConfig.isSsl()).isTrue(); + assertThat(azureIotHubNodeConfig.isParseToPlainText()).isFalse(); + assertThat(azureIotHubNodeConfig.getCredentials()).isInstanceOf(AzureIotHubSasCredentials.class); + } + + @Test + public void verifyPrepareMqttClientConfigMethodWithAzureIotHubSasCredentials() throws Exception { + AzureIotHubSasCredentials credentials = new AzureIotHubSasCredentials(); + credentials.setSasKey("testSasKey"); + credentials.setCaCert("test-ca-cert.pem"); + azureIotHubNodeConfig.setCredentials(credentials); + + willReturn(mqttClientMock).given(azureIotHubNode).initAzureClient(any()); + azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig))); + + MqttClientConfig mqttClientConfig = new MqttClientConfig(); + azureIotHubNode.prepareMqttClientConfig(mqttClientConfig); + + assertThat(mqttClientConfig.getProtocolVersion()).isEqualTo(MqttVersion.MQTT_3_1_1); + assertThat(mqttClientConfig.getUsername()).isEqualTo(AzureIotHubUtil.buildUsername(azureIotHubNodeConfig.getHost(), mqttClientConfig.getClientId())); + assertThat(mqttClientConfig.getPassword()).isEqualTo(AzureIotHubUtil.buildSasToken(azureIotHubNodeConfig.getHost(), credentials.getSasKey())); + } + + @Test + public void givenPemCredentialsAndSuccessfulConnectResult_whenInit_thenOk() throws Exception { + CertPemCredentials credentials = new CertPemCredentials(); + credentials.setCaCert("test-ca-cert.pem"); + credentials.setPassword("test-password"); + azureIotHubNodeConfig.setCredentials(credentials); + + willReturn(mqttClientMock).given(azureIotHubNode).initAzureClient(any()); + + assertThatNoException().isThrownBy( + () -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))); + + var mqttNodeConfiguration = (TbMqttNodeConfiguration) ReflectionTestUtils.getField(azureIotHubNode, "mqttNodeConfiguration"); + assertThat(mqttNodeConfiguration).isNotNull(); + assertThat(mqttNodeConfiguration.getPort()).isEqualTo(8883); + assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); + } + +}