diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java index 9ac2770b9d..6516a5e4b3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java @@ -35,8 +35,6 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import java.util.concurrent.ExecutionException; - import static org.thingsboard.common.util.DonAsynchron.withCallback; @Slf4j @@ -81,34 +79,34 @@ public class TbSnsNode extends TbAbstractExternalNode { } @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + public void onMsg(TbContext ctx, TbMsg msg) { var tbMsg = ackIfNeeded(ctx, msg); withCallback(publishMessageAsync(ctx, tbMsg), m -> tellSuccess(ctx, m), - t -> tellFailure(ctx, processException(ctx, tbMsg, t), t)); + t -> tellFailure(ctx, processException(tbMsg, t), t)); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { - return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg)); + return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(msg)); } - private TbMsg publishMessage(TbContext ctx, TbMsg msg) { + private TbMsg publishMessage(TbMsg msg) { String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg); PublishRequest publishRequest = new PublishRequest() .withTopicArn(topicArn) .withMessage(msg.getData()); PublishResult result = this.snsClient.publish(publishRequest); - return processPublishResult(ctx, msg, result); + return processPublishResult(msg, result); } - private TbMsg processPublishResult(TbContext ctx, TbMsg origMsg, PublishResult result) { + private TbMsg processPublishResult(TbMsg origMsg, PublishResult result) { TbMsgMetaData metaData = origMsg.getMetaData().copy(); metaData.putValue(MESSAGE_ID, result.getMessageId()); metaData.putValue(REQUEST_ID, result.getSdkResponseMetadata().getRequestId()); return TbMsg.transformMsgMetadata(origMsg, metaData); } - private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable t) { + private TbMsg processException(TbMsg origMsg, Throwable t) { TbMsgMetaData metaData = origMsg.getMetaData().copy(); metaData.putValue(ERROR, t.getClass() + ": " + t.getMessage()); return TbMsg.transformMsgMetadata(origMsg, metaData); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sns/TbSnsNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sns/TbSnsNodeTest.java new file mode 100644 index 0000000000..09787d3da8 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sns/TbSnsNodeTest.java @@ -0,0 +1,176 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.aws.sns; + +import com.amazonaws.ResponseMetadata; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.model.PublishRequest; +import com.amazonaws.services.sns.model.PublishResult; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.verifyNoMoreInteractions; + +@ExtendWith(MockitoExtension.class) +class TbSnsNodeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("fccfdf2e-6a88-4a94-81dd-5cbb557019cf")); + private final ListeningExecutor executor = new TestDbCallbackExecutor(); + + private TbSnsNode node; + private TbSnsNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private AmazonSNS snsClientMock; + @Mock + private PublishResult publishResultMock; + @Mock + private ResponseMetadata responseMetadataMock; + + @BeforeEach + void setUp() { + node = new TbSnsNode(); + config = new TbSnsNodeConfiguration().defaultConfiguration(); + ReflectionTestUtils.setField(node, "snsClient", snsClientMock); + ReflectionTestUtils.setField(node, "config", config); + } + + @Test + void verifyDefaultConfig() { + assertThat(config.getTopicArnPattern()).isEqualTo("arn:aws:sns:us-east-1:123456789012:MyNewTopic"); + assertThat(config.getAccessKeyId()).isNull(); + assertThat(config.getSecretAccessKey()).isNull(); + assertThat(config.getRegion()).isEqualTo("us-east-1"); + } + + @ParameterizedTest + @MethodSource + void givenForceAckIsTrueAndTopicNamePattern_whenOnMsg_thenEnqueueForTellNext(String topicName, TbMsgMetaData metaData, String data) { + ReflectionTestUtils.setField(node, "forceAck", true); + config.setAccessKeyId("accessKeyId"); + config.setSecretAccessKey("secretAccessKey"); + config.setTopicArnPattern(topicName); + String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7"; + String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31"; + + given(ctxMock.getExternalCallExecutor()).willReturn(executor); + given(snsClientMock.publish(any(PublishRequest.class))).willReturn(publishResultMock); + given(publishResultMock.getMessageId()).willReturn(messageId); + given(publishResultMock.getSdkResponseMetadata()).willReturn(responseMetadataMock); + given(responseMetadataMock.getRequestId()).willReturn(requestId); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); + node.onMsg(ctxMock, msg); + + then(ctxMock).should().ack(msg); + PublishRequest publishRequest = new PublishRequest() + .withTopicArn(TbNodeUtils.processPattern(topicName, msg)) + .withMessage(data); + then(snsClientMock).should().publish(publishRequest); + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().enqueueForTellNext(actualMsgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS)); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg) + .usingRecursiveComparison() + .ignoringFields("metaData", "ctx") + .isEqualTo(msg); + assertThat(actualMsg.getMetaData().getData()) + .hasFieldOrPropertyWithValue("messageId", messageId) + .hasFieldOrPropertyWithValue("requestId", requestId); + verifyNoMoreInteractions(ctxMock, snsClientMock, publishResultMock, responseMetadataMock); + } + + private static Stream givenForceAckIsTrueAndTopicNamePattern_whenOnMsg_thenEnqueueForTellNext() { + return Stream.of( + Arguments.of("arn:aws:sns:us-east-1:123456789012:NewTopic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), + Arguments.of("arn:aws:sns:us-east-1:123456789012:$[msgTopicName]", TbMsgMetaData.EMPTY, "{\"msgTopicName\":\"msg-topic-name\"}"), + Arguments.of("arn:aws:sns:us-east-1:123456789012:${mdTopicName}", new TbMsgMetaData(Map.of("mdTopicName", "md-topic-name")), TbMsg.EMPTY_JSON_OBJECT) + ); + } + + @Test + void givenForceAckIsFalseAndErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() { + ReflectionTestUtils.setField(node, "forceAck", false); + ListeningExecutor listeningExecutor = mock(ListeningExecutor.class); + given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor); + String errorMsg = "Something went wrong"; + ListenableFuture failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg)); + given(listeningExecutor.executeAsync(any(Callable.class))).willReturn(failedFuture); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + then(ctxMock).should(never()).enqueueForTellNext(any(), any(String.class)); + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), throwableCaptor.capture()); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg) + .usingRecursiveComparison() + .ignoringFields("metaData", "ctx") + .isEqualTo(msg); + assertThat(actualMsg.getMetaData().getData()) + .hasFieldOrPropertyWithValue("error", RuntimeException.class + ": " + errorMsg); + assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + verifyNoMoreInteractions(ctxMock, snsClientMock); + } + + @Test + void givenSnsClientIsNotNull_whenDestroy_thenShutdown() { + node.destroy(); + then(snsClientMock).should().shutdown(); + } + + @Test + void givenSnsClientIsNull_whenDestroy_thenVerifyNoInteractions() { + ReflectionTestUtils.setField(node, "snsClient", null); + node.destroy(); + then(snsClientMock).shouldHaveNoInteractions(); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNodeTest.java new file mode 100644 index 0000000000..48f42edb97 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNodeTest.java @@ -0,0 +1,263 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.aws.sqs; + +import com.amazonaws.ResponseMetadata; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.aws.sqs.TbSqsNodeConfiguration.QueueType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.verifyNoMoreInteractions; + +@ExtendWith(MockitoExtension.class) +class TbSqsNodeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("764de824-929f-4114-95ea-0ea0401ffa3d")); + private final ListeningExecutor executor = new TestDbCallbackExecutor(); + + private final String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7"; + private final String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31"; + + private TbSqsNode node; + private TbSqsNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private AmazonSQS sqsClientMock; + @Mock + private SendMessageResult sendMessageResultMock; + @Mock + private ResponseMetadata responseMetadataMock; + + @BeforeEach + void setUp() { + node = new TbSqsNode(); + config = new TbSqsNodeConfiguration().defaultConfiguration(); + ReflectionTestUtils.setField(node, "sqsClient", sqsClientMock); + ReflectionTestUtils.setField(node, "config", config); + } + + @Test + void verifyDefaultConfig() { + assertThat(config.getQueueType()).isEqualTo(QueueType.STANDARD); + assertThat(config.getQueueUrlPattern()).isEqualTo("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue-name"); + assertThat(config.getDelaySeconds()).isEqualTo(0); + assertThat(config.getMessageAttributes()).isEqualTo(Collections.emptyMap()); + assertThat(config.getAccessKeyId()).isNull(); + assertThat(config.getSecretAccessKey()).isNull(); + assertThat(config.getRegion()).isEqualTo("us-east-1"); + } + + @ParameterizedTest + @MethodSource + void givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest(String queueUrl, TbMsgMetaData metaData, String data) { + config.setQueueType(QueueType.FIFO); + config.setQueueUrlPattern(queueUrl); + + mockSendingMsgRequest(); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); + node.onMsg(ctxMock, msg); + + SendMessageRequest sendMsgRequest = new SendMessageRequest() + .withQueueUrl(TbNodeUtils.processPattern(queueUrl, msg)) + .withMessageBody(data) + .withMessageDeduplicationId(msg.getId().toString()) + .withMessageGroupId(DEVICE_ID.toString()); + then(sqsClientMock).should().sendMessage(sendMsgRequest); + } + + private static Stream givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest() { + return Stream.of( + Arguments.of( + "https://sqs.us-east-1.amazonaws.com/123456789012/new-queue-name", + TbMsgMetaData.EMPTY, + TbMsg.EMPTY_JSON_OBJECT), + Arguments.of( + "https://sqs.us-east-1.amazonaws.com/123456789012/$[msgQueueName]", + TbMsgMetaData.EMPTY, + "{\"msgQueueName\":\"msg-queue-name\"}"), + Arguments.of( + "https://sqs.us-east-1.amazonaws.com/123456789012/${mdQueueName}", + new TbMsgMetaData(Map.of("mdQueueName", "md-queue-name")), + TbMsg.EMPTY_JSON_OBJECT) + ); + } + + @ParameterizedTest + @MethodSource + void givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest(TbMsgMetaData metaData, String data, + Map attributes) { + config.setMessageAttributes(attributes); + + mockSendingMsgRequest(); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); + node.onMsg(ctxMock, msg); + + Map messageAttributes = new HashMap<>(); + this.config.getMessageAttributes().forEach((k, v) -> { + String name = TbNodeUtils.processPattern(k, msg); + String val = TbNodeUtils.processPattern(v, msg); + messageAttributes.put(name, new MessageAttributeValue().withDataType("String").withStringValue(val)); + }); + SendMessageRequest sendMsgRequest = new SendMessageRequest() + .withQueueUrl(config.getQueueUrlPattern()) + .withMessageBody(data) + .withMessageAttributes(messageAttributes) + .withDelaySeconds(config.getDelaySeconds()); + then(sqsClientMock).should().sendMessage(sendMsgRequest); + } + + private static Stream givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest() { + return Stream.of( + Arguments.of(TbMsgMetaData.EMPTY, + TbMsg.EMPTY_JSON_OBJECT, + Map.of("attributeName", "attributeValue")), + Arguments.of(TbMsgMetaData.EMPTY, + "{\"msgAttrNamePattern\":\"msgAttrName\",\"msgAttrValuePattern\":\"msgAttrValue\"}", + Map.of("$[msgAttrNamePattern]", "$[msgAttrValuePattern]")), + Arguments.of(new TbMsgMetaData(Map.of("mdAttrNamePattern", "mdAttrName", "mdAttrValuePattern", "mdAttrValue")), + TbMsg.EMPTY_JSON_OBJECT, + Map.of("${mdAttrNamePattern}", "${mdAttrValuePattern}")) + ); + } + + @Test + void givenForceAckIsTrueAndMsgResultContainsBodyAndAttributesAndNumber_whenOnMsg_thenEnqueueForTellNext() { + ReflectionTestUtils.setField(node, "forceAck", true); + String messageBodyMd5 = "msgBodyMd5-55fb8ba2-2b71-4673-a82a-969756764761"; + String messageAttributesMd5 = "msgAttrMd5-e3ba3eef-52ae-436a-bec1-0c2c2252d1f1"; + String sequenceNumber = "seqNum-bb5ddce0-cf4e-4295-b015-524bdb6a332f"; + + mockSendingMsgRequest(); + given(sendMessageResultMock.getMD5OfMessageBody()).willReturn(messageBodyMd5); + given(sendMessageResultMock.getMD5OfMessageAttributes()).willReturn(messageAttributesMd5); + given(sendMessageResultMock.getSequenceNumber()).willReturn(sequenceNumber); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + then(ctxMock).should().ack(msg); + SendMessageRequest sendMsgRequest = new SendMessageRequest() + .withQueueUrl(TbNodeUtils.processPattern(config.getQueueUrlPattern(), msg)) + .withMessageBody(msg.getData()) + .withDelaySeconds(config.getDelaySeconds()); + then(sqsClientMock).should().sendMessage(sendMsgRequest); + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().enqueueForTellNext(actualMsgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS)); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg) + .usingRecursiveComparison() + .ignoringFields("metaData", "ctx") + .isEqualTo(msg); + assertThat(actualMsg.getMetaData().getData()) + .hasFieldOrPropertyWithValue("messageId", messageId) + .hasFieldOrPropertyWithValue("requestId", requestId) + .hasFieldOrPropertyWithValue("messageBodyMd5", messageBodyMd5) + .hasFieldOrPropertyWithValue("messageAttributesMd5", messageAttributesMd5) + .hasFieldOrPropertyWithValue("sequenceNumber", sequenceNumber); + verifyNoMoreInteractions(ctxMock, sqsClientMock, sendMessageResultMock, responseMetadataMock); + } + + @Test + void givenForceAckIsFalseAndErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() { + ReflectionTestUtils.setField(node, "forceAck", false); + ListeningExecutor listeningExecutor = mock(ListeningExecutor.class); + given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor); + String errorMsg = "Something went wrong"; + + ListenableFuture failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg)); + given(listeningExecutor.executeAsync(any(Callable.class))).willReturn(failedFuture); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + then(ctxMock).should(never()).enqueueForTellNext(any(), any(String.class)); + ArgumentCaptor actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), throwableCaptor.capture()); + TbMsg actualMsg = actualMsgCaptor.getValue(); + assertThat(actualMsg) + .usingRecursiveComparison() + .ignoringFields("metaData", "ctx") + .isEqualTo(msg); + assertThat(actualMsg.getMetaData().getData()) + .hasFieldOrPropertyWithValue("error", RuntimeException.class + ": " + errorMsg); + assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); + verifyNoMoreInteractions(ctxMock, sqsClientMock); + } + + @Test + void givenSqsClientIsNotNull_whenDestroy_thenShutdown() { + node.destroy(); + then(sqsClientMock).should().shutdown(); + } + + @Test + void givenSqsClientIsNull_whenDestroy_thenVerifyNoInteractions() { + ReflectionTestUtils.setField(node, "sqsClient", null); + node.destroy(); + then(sqsClientMock).shouldHaveNoInteractions(); + } + + private void mockSendingMsgRequest() { + given(ctxMock.getExternalCallExecutor()).willReturn(executor); + given(sqsClientMock.sendMessage(any(SendMessageRequest.class))).willReturn(sendMessageResultMock); + given(sendMessageResultMock.getMessageId()).willReturn(messageId); + given(sendMessageResultMock.getSdkResponseMetadata()).willReturn(responseMetadataMock); + given(responseMetadataMock.getRequestId()).willReturn(requestId); + } + +}