added tests for mqtt node
This commit is contained in:
		
							parent
							
								
									694967e535
								
							
						
					
					
						commit
						a86a42813f
					
				@ -53,6 +53,7 @@ import java.util.concurrent.TimeoutException;
 | 
			
		||||
        type = ComponentType.EXTERNAL,
 | 
			
		||||
        name = "mqtt",
 | 
			
		||||
        configClazz = TbMqttNodeConfiguration.class,
 | 
			
		||||
        version = 1,
 | 
			
		||||
        clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
 | 
			
		||||
        nodeDescription = "Publish messages to the MQTT broker",
 | 
			
		||||
        nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",
 | 
			
		||||
@ -145,7 +146,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
 | 
			
		||||
        return client;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException {
 | 
			
		||||
    protected void prepareMqttClientConfig(MqttClientConfig config) {
 | 
			
		||||
        ClientCredentials credentials = this.mqttNodeConfiguration.getCredentials();
 | 
			
		||||
        if (credentials.getType() == CredentialsType.BASIC) {
 | 
			
		||||
            BasicCredentials basicCredentials = (BasicCredentials) credentials;
 | 
			
		||||
@ -154,7 +155,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private SslContext getSslContext() throws SSLException {
 | 
			
		||||
    protected SslContext getSslContext() throws SSLException {
 | 
			
		||||
        return this.mqttNodeConfiguration.isSsl() ? this.mqttNodeConfiguration.getCredentials().initSslContext() : null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -66,7 +66,7 @@ public class TbAzureIotHubNode extends TbMqttNode {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void prepareMqttClientConfig(MqttClientConfig config) throws SSLException {
 | 
			
		||||
    protected void prepareMqttClientConfig(MqttClientConfig config) {
 | 
			
		||||
        config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
 | 
			
		||||
        config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
 | 
			
		||||
        ClientCredentials credentials = mqttNodeConfiguration.getCredentials();
 | 
			
		||||
 | 
			
		||||
@ -15,26 +15,222 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.mqtt;
 | 
			
		||||
 | 
			
		||||
import io.netty.buffer.ByteBuf;
 | 
			
		||||
import io.netty.buffer.Unpooled;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttQoS;
 | 
			
		||||
import io.netty.handler.ssl.SslContext;
 | 
			
		||||
import io.netty.handler.ssl.SslContextBuilder;
 | 
			
		||||
import io.netty.util.concurrent.Future;
 | 
			
		||||
import io.netty.util.concurrent.GenericFutureListener;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
			
		||||
import org.junit.jupiter.params.ParameterizedTest;
 | 
			
		||||
import org.junit.jupiter.params.provider.Arguments;
 | 
			
		||||
import org.junit.jupiter.params.provider.MethodSource;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.Spy;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoExtension;
 | 
			
		||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClient;
 | 
			
		||||
import org.thingsboard.mqtt.MqttClientConfig;
 | 
			
		||||
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.credentials.AnonymousCredentials;
 | 
			
		||||
import org.thingsboard.rule.engine.credentials.BasicCredentials;
 | 
			
		||||
import org.thingsboard.rule.engine.credentials.ClientCredentials;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleNodeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
import javax.net.ssl.SSLException;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static com.amazonaws.util.StringUtils.UTF8;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyBoolean;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.BDDMockito.given;
 | 
			
		||||
import static org.mockito.BDDMockito.mock;
 | 
			
		||||
import static org.mockito.BDDMockito.spy;
 | 
			
		||||
import static org.mockito.BDDMockito.then;
 | 
			
		||||
import static org.mockito.BDDMockito.willAnswer;
 | 
			
		||||
import static org.mockito.BDDMockito.willReturn;
 | 
			
		||||
import static org.mockito.BDDMockito.willThrow;
 | 
			
		||||
 | 
			
		||||
@ExtendWith(MockitoExtension.class)
 | 
			
		||||
class TbMqttNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
 | 
			
		||||
    private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("09115d92-d333-432a-868c-ccd6e89c9287"));
 | 
			
		||||
 | 
			
		||||
    @Spy
 | 
			
		||||
    TbMqttNode node;
 | 
			
		||||
    private TbMqttNode node;
 | 
			
		||||
    private TbMqttNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbContext ctxMock;
 | 
			
		||||
    @Mock
 | 
			
		||||
    private MqttClient clientMock;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    public void setUp() throws Exception {
 | 
			
		||||
        node = mock(TbMqttNode.class);
 | 
			
		||||
        node = spy(new TbMqttNode());
 | 
			
		||||
        config = new TbMqttNodeConfiguration().defaultConfiguration();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void verifyDefaultConfig() {
 | 
			
		||||
        assertThat(config.getTopicPattern()).isEqualTo("my-topic");
 | 
			
		||||
        assertThat(config.getHost()).isNull();
 | 
			
		||||
        assertThat(config.getPort()).isEqualTo(1883);
 | 
			
		||||
        assertThat(config.getConnectTimeoutSec()).isEqualTo(10);
 | 
			
		||||
        assertThat(config.getClientId()).isNull();
 | 
			
		||||
        assertThat(config.isAppendClientIdSuffix()).isFalse();
 | 
			
		||||
        assertThat(config.isRetainedMessage()).isFalse();
 | 
			
		||||
        assertThat(config.isCleanSession()).isTrue();
 | 
			
		||||
        assertThat(config.isSsl()).isFalse();
 | 
			
		||||
        assertThat(config.isParseToPlainText()).isFalse();
 | 
			
		||||
        assertThat(config.getCredentials()).isInstanceOf(AnonymousCredentials.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void verifyGetOwnerIdMethod() {
 | 
			
		||||
        String tenantIdStr = "6f67b6cc-21dd-46c5-809c-402b738a3f8b";
 | 
			
		||||
        String ruleNodeIdStr = "80a90b53-6888-4344-bf46-01ce8e96eee7";
 | 
			
		||||
        RuleNode ruleNode = new RuleNode(new RuleNodeId(UUID.fromString(ruleNodeIdStr)));
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(TenantId.fromUUID(UUID.fromString(tenantIdStr)));
 | 
			
		||||
        given(ctxMock.getSelf()).willReturn(ruleNode);
 | 
			
		||||
 | 
			
		||||
        String actualOwnerIdStr = node.getOwnerId(ctxMock);
 | 
			
		||||
        String expectedOwnerIdStr = "Tenant[" + tenantIdStr + "]RuleNode[" + ruleNodeIdStr + "]";
 | 
			
		||||
        assertThat(actualOwnerIdStr).isEqualTo(expectedOwnerIdStr);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void verifyPrepareMqttClientConfigMethodWithBasicCredentials() throws SSLException {
 | 
			
		||||
        BasicCredentials credentials = new BasicCredentials();
 | 
			
		||||
        credentials.setUsername("test_username");
 | 
			
		||||
        credentials.setPassword("test_password");
 | 
			
		||||
        config.setCredentials(credentials);
 | 
			
		||||
        ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config);
 | 
			
		||||
        MqttClientConfig mqttClientConfig = new MqttClientConfig(node.getSslContext());
 | 
			
		||||
 | 
			
		||||
        node.prepareMqttClientConfig(mqttClientConfig);
 | 
			
		||||
 | 
			
		||||
        assertThat(mqttClientConfig)
 | 
			
		||||
                .hasFieldOrPropertyWithValue("username", "test_username")
 | 
			
		||||
                .hasFieldOrPropertyWithValue("password", "test_password");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    public void verifyGetSslContextMethod(boolean ssl, ClientCredentials credentials, SslContext expectedSslContext) throws SSLException {
 | 
			
		||||
        config.setSsl(ssl);
 | 
			
		||||
        config.setCredentials(credentials);
 | 
			
		||||
        ReflectionTestUtils.setField(node, "mqttNodeConfiguration", config);
 | 
			
		||||
 | 
			
		||||
        SslContext actualSslContext = node.getSslContext();
 | 
			
		||||
        assertThat(actualSslContext)
 | 
			
		||||
                .usingRecursiveComparison()
 | 
			
		||||
                .ignoringFields("ctx", "ctxLock", "sessionContext.context.ctx", "sessionContext.context.ctxLock")
 | 
			
		||||
                .isEqualTo(expectedSslContext);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> verifyGetSslContextMethod() throws SSLException {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of(true, new BasicCredentials(), SslContextBuilder.forClient().build()),
 | 
			
		||||
                Arguments.of(false, new AnonymousCredentials(), null)
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenFailedToInitializeMqttClient_whenInit_thenThrowsException() throws Exception {
 | 
			
		||||
        String errorMsg = "Failed to connect to MQTT broker!";
 | 
			
		||||
        willThrow(new RuntimeException(errorMsg)).given(node).initClient(any());
 | 
			
		||||
 | 
			
		||||
        var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
 | 
			
		||||
        assertThatThrownBy(() -> node.init(ctxMock, configuration))
 | 
			
		||||
                .isInstanceOf(TbNodeException.class)
 | 
			
		||||
                .hasMessage(RuntimeException.class.getName() + ": " + errorMsg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource
 | 
			
		||||
    public void givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess(String topicPattern, TbMsgMetaData metaData, String data) throws Exception {
 | 
			
		||||
        config.setRetainedMessage(true);
 | 
			
		||||
        config.setTopicPattern(topicPattern);
 | 
			
		||||
 | 
			
		||||
        willReturn(clientMock).given(node).initClient(any());
 | 
			
		||||
        Future<Void> future = mock(Future.class);
 | 
			
		||||
        given(future.isSuccess()).willReturn(true);
 | 
			
		||||
        given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future);
 | 
			
		||||
        willAnswer(invocation-> {
 | 
			
		||||
            GenericFutureListener<Future<Void>> listener = invocation.getArgument(0);
 | 
			
		||||
            listener.operationComplete(future);
 | 
			
		||||
            return null;
 | 
			
		||||
        }).given(future).addListener(any());
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        String expectedTopic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
 | 
			
		||||
        then(clientMock).should().publish(expectedTopic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, true);
 | 
			
		||||
        then(ctxMock).should().tellSuccess(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenTopicPatternAndIsRetainedMsgIsTrue_whenOnMsg_thenTellSuccess() {
 | 
			
		||||
        return Stream.of(
 | 
			
		||||
                Arguments.of("new-topic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT),
 | 
			
		||||
                Arguments.of("${md-topic-name}", new TbMsgMetaData(Map.of("md-topic-name", "md-new-topic")), TbMsg.EMPTY_JSON_OBJECT),
 | 
			
		||||
                Arguments.of("$[msg-topic-name]", TbMsgMetaData.EMPTY, "{\"msg-topic-name\":\"msg-new-topic\"}")
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenParseToPlainTextIsTrueAndMsgPublishingFailed_whenOnMsg_thenTellFailure() throws Exception {
 | 
			
		||||
        config.setParseToPlainText(true);
 | 
			
		||||
 | 
			
		||||
        willReturn(clientMock).given(node).initClient(any());
 | 
			
		||||
        Future<Void> future = mock(Future.class);
 | 
			
		||||
        given(clientMock.publish(any(String.class), any(ByteBuf.class), any(MqttQoS.class), anyBoolean())).willReturn(future);
 | 
			
		||||
        given(future.isSuccess()).willReturn(false);
 | 
			
		||||
        String errorMsg = "Message publishing was failed!";
 | 
			
		||||
        Throwable exception = new RuntimeException(errorMsg);
 | 
			
		||||
        given(future.cause()).willReturn(exception);
 | 
			
		||||
        willAnswer(invocation-> {
 | 
			
		||||
            GenericFutureListener<Future<Void>> listener = invocation.getArgument(0);
 | 
			
		||||
            listener.operationComplete(future);
 | 
			
		||||
            return null;
 | 
			
		||||
        }).given(future).addListener(any());
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "\"string\"");
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
        String expectedData = JacksonUtil.toPlainText(msg.getData());
 | 
			
		||||
        then(clientMock).should().publish(config.getTopicPattern(), Unpooled.wrappedBuffer(expectedData.getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, false);
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("error", RuntimeException.class + ": " + errorMsg);
 | 
			
		||||
        TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData);
 | 
			
		||||
        ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), eq(exception));
 | 
			
		||||
        TbMsg actualMsg = actualMsgCaptor.getValue();
 | 
			
		||||
        assertThat(actualMsg).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user