diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index d26d6bb4e5..c909a44a27 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -146,7 +146,7 @@ public class TbMqttNode extends TbAbstractExternalNode { return client; } - public MqttClient getMqttClient(TbContext ctx, MqttClientConfig config) { + MqttClient getMqttClient(TbContext ctx, MqttClientConfig config) { return MqttClient.create(config, null, ctx.getExternalCallExecutor()); } @@ -159,7 +159,7 @@ public class TbMqttNode extends TbAbstractExternalNode { } } - protected SslContext getSslContext() throws SSLException { + private SslContext getSslContext() throws SSLException { return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index 3376c7d113..41d2ac4b8c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.mqtt.azure; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.AzureIotHubUtil; +import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -58,7 +59,7 @@ public class TbAzureIotHubNode extends TbMqttNode { pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert()); } } - this.mqttClient = initClient(ctx); + this.mqttClient = initAzureClient(ctx); } catch (Exception e) { throw new TbNodeException(e); } @@ -73,7 +74,11 @@ public class TbAzureIotHubNode extends TbMqttNode { } } - protected TbMqttNodeConfiguration getMqttNodeConfiguration() { + TbMqttNodeConfiguration getMqttNodeConfiguration() { return this.mqttNodeConfiguration; } + + MqttClient initAzureClient(TbContext ctx) throws Exception { + return initClient(ctx); + } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java index e680633360..da568eefbb 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeTest.java @@ -84,7 +84,7 @@ import static org.mockito.BDDMockito.willReturn; @ExtendWith(MockitoExtension.class) public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { - private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f")); + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d0c5d2a8-3a6e-4c95-8caf-47fbdc8ef98f")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287")); private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("11699e8f-c3f0-4366-9334-cbf75798314b")); @@ -143,7 +143,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mockSuccessfulInit(); mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); - MqttClientConfig mqttClientConfig = new MqttClientConfig(mqttNode.getSslContext()); + MqttClientConfig mqttClientConfig = new MqttClientConfig(); mqttNode.prepareMqttClientConfig(mqttClientConfig); assertThat(mqttClientConfig.getUsername()).isEqualTo("test_username"); @@ -158,7 +158,9 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mockSuccessfulInit(); mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); - SslContext actualSslContext = mqttNode.getSslContext(); + ArgumentCaptor mqttClientConfig = ArgumentCaptor.forClass(MqttClientConfig.class); + then(mqttNode).should().prepareMqttClientConfig(mqttClientConfig.capture()); + SslContext actualSslContext = mqttClientConfig.getValue().getSslContext(); assertThat(actualSslContext) .usingRecursiveComparison() .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock") @@ -172,7 +174,9 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { mockSuccessfulInit(); mqttNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(mqttNodeConfig))); - SslContext actualSslContext = mqttNode.getSslContext(); + ArgumentCaptor mqttClientConfig = ArgumentCaptor.forClass(MqttClientConfig.class); + then(mqttNode).should().prepareMqttClientConfig(mqttClientConfig.capture()); + SslContext actualSslContext = mqttClientConfig.getValue().getSslContext(); assertThat(actualSslContext).isNull(); } @@ -237,7 +241,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { Future future = mock(Future.class); given(future.isSuccess()).willReturn(true); given(mqttClientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future); - willAnswer(invocation-> { + willAnswer(invocation -> { GenericFutureListener> listener = invocation.getArgument(0); listener.operationComplete(future); return null; @@ -276,7 +280,7 @@ public class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest { String errorMsg = "Message publishing was failed!"; Throwable exception = new RuntimeException(errorMsg); given(future.cause()).willReturn(exception); - willAnswer(invocation-> { + willAnswer(invocation -> { GenericFutureListener> listener = invocation.getArgument(0); listener.operationComplete(future); return null; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java index f6f8adeafb..cc0668e5c0 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeTest.java @@ -15,9 +15,7 @@ */ package org.thingsboard.rule.engine.mqtt.azure; -import io.netty.channel.EventLoopGroup; import io.netty.handler.codec.mqtt.MqttVersion; -import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -27,24 +25,14 @@ import org.thingsboard.common.util.AzureIotHubUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; -import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.credentials.CertPemCredentials; import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration; -import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.rule.RuleNode; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.willReturn; @@ -58,12 +46,6 @@ public class TbAzureIotHubNodeTest { protected TbContext ctxMock; @Mock protected MqttClient mqttClientMock; - @Mock - protected EventLoopGroup eventLoopGroupMock; - @Mock - protected Promise promiseMock; - @Mock - protected MqttConnectResult resultMock; @BeforeEach public void setUp() { @@ -93,7 +75,7 @@ public class TbAzureIotHubNodeTest { credentials.setCaCert("test-ca-cert.pem"); azureIotHubNodeConfig.setCredentials(credentials); - mockSuccessfulInit(); + willReturn(mqttClientMock).given(azureIotHubNode).initAzureClient(any()); azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig))); MqttClientConfig mqttClientConfig = new MqttClientConfig(); @@ -111,7 +93,7 @@ public class TbAzureIotHubNodeTest { credentials.setPassword("test-password"); azureIotHubNodeConfig.setCredentials(credentials); - mockSuccessfulInit(); + willReturn(mqttClientMock).given(azureIotHubNode).initAzureClient(any()); assertThatNoException().isThrownBy( () -> azureIotHubNode.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(azureIotHubNodeConfig)))); @@ -121,13 +103,4 @@ public class TbAzureIotHubNodeTest { assertThat(mqttNodeConfiguration.isCleanSession()).isTrue(); } - private void mockSuccessfulInit() throws Exception { - given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString("74aad2a5-3c07-43fb-9c9b-07fafb4e86ce"))); - given(ctxMock.getSelf()).willReturn(new RuleNode(new RuleNodeId(UUID.fromString("da5eb2ef-4ea7-4ac9-9359-0e727a0c30ce")))); - given(ctxMock.getSharedEventLoop()).willReturn(eventLoopGroupMock); - willReturn(mqttClientMock).given(azureIotHubNode).getMqttClient(any(), any()); - given(mqttClientMock.connect(any(), anyInt())).willReturn(promiseMock); - given(promiseMock.get(anyLong(), any(TimeUnit.class))).willReturn(resultMock); - given(resultMock.isSuccess()).willReturn(true); - } }