added tests for rabbitmq node

This commit is contained in:
IrynaMatveieva 2024-07-29 15:36:38 +03:00
parent 6a27de82bd
commit 2ce75c5d16
2 changed files with 237 additions and 11 deletions

View File

@ -64,16 +64,7 @@ public class TbRabbitMqNode extends TbAbstractExternalNode {
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx); super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbRabbitMqNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbRabbitMqNodeConfiguration.class);
ConnectionFactory factory = new ConnectionFactory(); ConnectionFactory factory = getConnectionFactory();
factory.setHost(this.config.getHost());
factory.setPort(this.config.getPort());
factory.setVirtualHost(this.config.getVirtualHost());
factory.setUsername(this.config.getUsername());
factory.setPassword(this.config.getPassword());
factory.setAutomaticRecoveryEnabled(this.config.isAutomaticRecoveryEnabled());
factory.setConnectionTimeout(this.config.getConnectionTimeout());
factory.setHandshakeTimeout(this.config.getHandshakeTimeout());
this.config.getClientProperties().forEach((k,v) -> factory.getClientProperties().put(k,v));
try { try {
this.connection = factory.newConnection(); this.connection = factory.newConnection();
this.channel = this.connection.createChannel(); this.channel = this.connection.createChannel();
@ -90,6 +81,20 @@ public class TbRabbitMqNode extends TbAbstractExternalNode {
t -> tellFailure(ctx, processException(tbMsg, t), t)); t -> tellFailure(ctx, processException(tbMsg, t), t));
} }
protected ConnectionFactory getConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.config.getHost());
factory.setPort(this.config.getPort());
factory.setVirtualHost(this.config.getVirtualHost());
factory.setUsername(this.config.getUsername());
factory.setPassword(this.config.getPassword());
factory.setAutomaticRecoveryEnabled(this.config.isAutomaticRecoveryEnabled());
factory.setConnectionTimeout(this.config.getConnectionTimeout());
factory.setHandshakeTimeout(this.config.getHandshakeTimeout());
this.config.getClientProperties().forEach((k,v) -> factory.getClientProperties().put(k,v));
return factory;
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) { private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg)); return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg));
} }
@ -132,7 +137,7 @@ public class TbRabbitMqNode extends TbAbstractExternalNode {
} }
} }
private static AMQP.BasicProperties convert(String name) throws TbNodeException { protected static AMQP.BasicProperties convert(String name) throws TbNodeException {
switch (name) { switch (name) {
case "BASIC": case "BASIC":
return MessageProperties.BASIC; return MessageProperties.BASIC;

View File

@ -0,0 +1,221 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rabbitmq;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ExtendWith(MockitoExtension.class)
public class TbRabbitMqNodeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("b3d6f9dd-15cc-4e61-acc0-13197a090406"));
private final ListeningExecutor executor = new TestDbCallbackExecutor();
private TbRabbitMqNode node;
private TbRabbitMqNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private ConnectionFactory factoryMock;
@Mock
private Connection connectionMock;
@Mock
private Channel channelMock;
@BeforeEach
public void setUp() {
node = spy(new TbRabbitMqNode());
config = new TbRabbitMqNodeConfiguration().defaultConfiguration();
}
@Test
public void verifyDefaultConfig() {
assertThat(config.getExchangeNamePattern()).isEqualTo("");
assertThat(config.getRoutingKeyPattern()).isEqualTo("");
assertThat(config.getMessageProperties()).isNull();
assertThat(config.getHost()).isEqualTo(ConnectionFactory.DEFAULT_HOST);
assertThat(config.getPort()).isEqualTo(ConnectionFactory.DEFAULT_AMQP_PORT);
assertThat(config.getVirtualHost()).isEqualTo(ConnectionFactory.DEFAULT_VHOST);
assertThat(config.getUsername()).isEqualTo(ConnectionFactory.DEFAULT_USER);
assertThat(config.getPassword()).isEqualTo(ConnectionFactory.DEFAULT_PASS);
assertThat(config.isAutomaticRecoveryEnabled()).isFalse();
assertThat(config.getConnectionTimeout()).isEqualTo(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT);
assertThat(config.getHandshakeTimeout()).isEqualTo(ConnectionFactory.DEFAULT_HANDSHAKE_TIMEOUT);
assertThat(config.getClientProperties()).isEqualTo(Collections.emptyMap());
}
@ParameterizedTest
@MethodSource
public void givenForceAckIsTrueAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndEnqueueForTellNext(
String exchangeNamePattern, String routingKeyPattern, String basicProperties, TbMsgMetaData metaData, String data
) throws Exception {
config.setExchangeNamePattern(exchangeNamePattern);
config.setRoutingKeyPattern(routingKeyPattern);
config.setMessageProperties(basicProperties);
given(ctxMock.isExternalNodeForceAck()).willReturn(true);
mockOnInit();
given(ctxMock.getExternalCallExecutor()).willReturn(executor);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
node.onMsg(ctxMock, msg);
then(ctxMock).should().ack(msg);
String exchangeName = TbNodeUtils.processPattern(exchangeNamePattern, msg);
String routingKey = TbNodeUtils.processPattern(routingKeyPattern, msg);
AMQP.BasicProperties properties = StringUtils.isNotEmpty(basicProperties) ? TbRabbitMqNode.convert(basicProperties) : null;
then(channelMock).should().basicPublish(exchangeName, routingKey, properties, data.getBytes(StandardCharsets.UTF_8));
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS));
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(msg);
}
private static Stream<Arguments> givenForceAckIsTrueAndExchangeNameAndRoutingKeyPatternsAndBasicProperties_whenOnMsg_thenPublishMsgAndEnqueueForTellNext() {
return Stream.of(
Arguments.of("", "", null, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT),
Arguments.of("topic_logs", "kern.critical", "", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT),
Arguments.of("${mdExchangeName}", "${mdRoutingKey}", "BASIC",
new TbMsgMetaData(Map.of("mdExchangeName", "md_topic_logs","mdRoutingKey", "md.kern.critical")),
TbMsg.EMPTY_JSON_OBJECT),
Arguments.of("$[msgExchangeName]", "$[msgRoutingKey]", "MINIMAL_PERSISTENT_BASIC",
TbMsgMetaData.EMPTY, "{\"msgExchangeName\":\"msg_topic_logs\",\"msgRoutingKey\":\"msg.kern.critical\"}")
);
}
@Test
public void givenForceAckIsFalseAndErrorOccursDuringPublishing_whenOnMsg_thenTellFailure() throws Exception {
given(ctxMock.isExternalNodeForceAck()).willReturn(false);
mockOnInit();
ListeningExecutor listeningExecutor = mock(ListeningExecutor.class);
given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor);
String errorMsg = "Something went wrong";
ListenableFuture<TbMsg> failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg));
willReturn(failedFuture).given(listeningExecutor).executeAsync(any(Callable.class));
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsgMetaData metaData = new TbMsgMetaData();
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
then(ctxMock).should(never()).ack(any(TbMsg.class));
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Throwable> throwable = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(actualMsg.capture(), throwable.capture());
metaData.putValue("error", RuntimeException.class + ": " + errorMsg);
TbMsg expectedMsg = TbMsg.transformMsgMetadata(msg, metaData);
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
assertThat(throwable.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
}
@ParameterizedTest
@MethodSource
public void givenAMQPBasicPropertiesName_whenConvert_thenReturnAMQPBasicProperties(String name, AMQP.BasicProperties expectedBasicProperties) throws TbNodeException {
AMQP.BasicProperties actualBasicProperties = TbRabbitMqNode.convert(name);
assertThat(actualBasicProperties).isEqualTo(expectedBasicProperties);
}
private static Stream<Arguments> givenAMQPBasicPropertiesName_whenConvert_thenReturnAMQPBasicProperties() {
return Stream.of(
Arguments.of("BASIC", MessageProperties.BASIC),
Arguments.of("TEXT_PLAIN", MessageProperties.TEXT_PLAIN),
Arguments.of("MINIMAL_BASIC", MessageProperties.MINIMAL_BASIC),
Arguments.of("MINIMAL_PERSISTENT_BASIC", MessageProperties.MINIMAL_PERSISTENT_BASIC),
Arguments.of("PERSISTENT_BASIC", MessageProperties.PERSISTENT_BASIC),
Arguments.of("PERSISTENT_TEXT_PLAIN", MessageProperties.PERSISTENT_TEXT_PLAIN)
);
}
@ParameterizedTest
@ValueSource(strings = {"Basic", "TEXT_plain", "minimal basic", "", " "})
public void givenUndefinedProperties_whenConvert_thenThrowsException(String name) {
assertThatThrownBy(() -> TbRabbitMqNode.convert(name))
.isInstanceOf(TbNodeException.class)
.hasMessage("Message Properties: '" + name + "' is undefined!");
}
@Test
public void givenConnection_whenDestroy_thenShouldClose() throws IOException {
ReflectionTestUtils.setField(node, "connection", connectionMock);
node.destroy();
then(connectionMock).should().close();
}
@Test
public void givenConnectionIsNull_whenDestroy_thenVerifyNoInteractions() {
node.destroy();
then(connectionMock).shouldHaveNoInteractions();
}
private void mockOnInit() throws IOException, TimeoutException {
willAnswer(invocation -> factoryMock).given(node).getConnectionFactory();
given(factoryMock.newConnection()).willReturn(connectionMock);
given(connectionMock.createChannel()).willReturn(channelMock);
}
}