Merge pull request #10921 from irynamatveieva/improvements/external-aws-nodes

AWS nodes: added tests
This commit is contained in:
Viacheslav Klimov 2024-06-24 16:46:32 +03:00 committed by GitHub
commit 633411f9e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 446 additions and 9 deletions

View File

@ -35,8 +35,6 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.concurrent.ExecutionException;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
@Slf4j
@ -81,34 +79,34 @@ public class TbSnsNode extends TbAbstractExternalNode {
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
public void onMsg(TbContext ctx, TbMsg msg) {
var tbMsg = ackIfNeeded(ctx, msg);
withCallback(publishMessageAsync(ctx, tbMsg),
m -> tellSuccess(ctx, m),
t -> tellFailure(ctx, processException(ctx, tbMsg, t), t));
t -> tellFailure(ctx, processException(tbMsg, t), t));
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg));
return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(msg));
}
private TbMsg publishMessage(TbContext ctx, TbMsg msg) {
private TbMsg publishMessage(TbMsg msg) {
String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg);
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(topicArn)
.withMessage(msg.getData());
PublishResult result = this.snsClient.publish(publishRequest);
return processPublishResult(ctx, msg, result);
return processPublishResult(msg, result);
}
private TbMsg processPublishResult(TbContext ctx, TbMsg origMsg, PublishResult result) {
private TbMsg processPublishResult(TbMsg origMsg, PublishResult result) {
TbMsgMetaData metaData = origMsg.getMetaData().copy();
metaData.putValue(MESSAGE_ID, result.getMessageId());
metaData.putValue(REQUEST_ID, result.getSdkResponseMetadata().getRequestId());
return TbMsg.transformMsgMetadata(origMsg, metaData);
}
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable t) {
private TbMsg processException(TbMsg origMsg, Throwable t) {
TbMsgMetaData metaData = origMsg.getMetaData().copy();
metaData.putValue(ERROR, t.getClass() + ": " + t.getMessage());
return TbMsg.transformMsgMetadata(origMsg, metaData);

View File

@ -0,0 +1,176 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.aws.sns;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.verifyNoMoreInteractions;
@ExtendWith(MockitoExtension.class)
class TbSnsNodeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("fccfdf2e-6a88-4a94-81dd-5cbb557019cf"));
private final ListeningExecutor executor = new TestDbCallbackExecutor();
private TbSnsNode node;
private TbSnsNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private AmazonSNS snsClientMock;
@Mock
private PublishResult publishResultMock;
@Mock
private ResponseMetadata responseMetadataMock;
@BeforeEach
void setUp() {
node = new TbSnsNode();
config = new TbSnsNodeConfiguration().defaultConfiguration();
ReflectionTestUtils.setField(node, "snsClient", snsClientMock);
ReflectionTestUtils.setField(node, "config", config);
}
@Test
void verifyDefaultConfig() {
assertThat(config.getTopicArnPattern()).isEqualTo("arn:aws:sns:us-east-1:123456789012:MyNewTopic");
assertThat(config.getAccessKeyId()).isNull();
assertThat(config.getSecretAccessKey()).isNull();
assertThat(config.getRegion()).isEqualTo("us-east-1");
}
@ParameterizedTest
@MethodSource
void givenForceAckIsTrueAndTopicNamePattern_whenOnMsg_thenEnqueueForTellNext(String topicName, TbMsgMetaData metaData, String data) {
ReflectionTestUtils.setField(node, "forceAck", true);
config.setAccessKeyId("accessKeyId");
config.setSecretAccessKey("secretAccessKey");
config.setTopicArnPattern(topicName);
String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7";
String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31";
given(ctxMock.getExternalCallExecutor()).willReturn(executor);
given(snsClientMock.publish(any(PublishRequest.class))).willReturn(publishResultMock);
given(publishResultMock.getMessageId()).willReturn(messageId);
given(publishResultMock.getSdkResponseMetadata()).willReturn(responseMetadataMock);
given(responseMetadataMock.getRequestId()).willReturn(requestId);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
node.onMsg(ctxMock, msg);
then(ctxMock).should().ack(msg);
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(TbNodeUtils.processPattern(topicName, msg))
.withMessage(data);
then(snsClientMock).should().publish(publishRequest);
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().enqueueForTellNext(actualMsgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS));
TbMsg actualMsg = actualMsgCaptor.getValue();
assertThat(actualMsg)
.usingRecursiveComparison()
.ignoringFields("metaData", "ctx")
.isEqualTo(msg);
assertThat(actualMsg.getMetaData().getData())
.hasFieldOrPropertyWithValue("messageId", messageId)
.hasFieldOrPropertyWithValue("requestId", requestId);
verifyNoMoreInteractions(ctxMock, snsClientMock, publishResultMock, responseMetadataMock);
}
private static Stream<Arguments> givenForceAckIsTrueAndTopicNamePattern_whenOnMsg_thenEnqueueForTellNext() {
return Stream.of(
Arguments.of("arn:aws:sns:us-east-1:123456789012:NewTopic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT),
Arguments.of("arn:aws:sns:us-east-1:123456789012:$[msgTopicName]", TbMsgMetaData.EMPTY, "{\"msgTopicName\":\"msg-topic-name\"}"),
Arguments.of("arn:aws:sns:us-east-1:123456789012:${mdTopicName}", new TbMsgMetaData(Map.of("mdTopicName", "md-topic-name")), TbMsg.EMPTY_JSON_OBJECT)
);
}
@Test
void givenForceAckIsFalseAndErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() {
ReflectionTestUtils.setField(node, "forceAck", false);
ListeningExecutor listeningExecutor = mock(ListeningExecutor.class);
given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor);
String errorMsg = "Something went wrong";
ListenableFuture<TbMsg> failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg));
given(listeningExecutor.executeAsync(any(Callable.class))).willReturn(failedFuture);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
then(ctxMock).should(never()).enqueueForTellNext(any(), any(String.class));
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), throwableCaptor.capture());
TbMsg actualMsg = actualMsgCaptor.getValue();
assertThat(actualMsg)
.usingRecursiveComparison()
.ignoringFields("metaData", "ctx")
.isEqualTo(msg);
assertThat(actualMsg.getMetaData().getData())
.hasFieldOrPropertyWithValue("error", RuntimeException.class + ": " + errorMsg);
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
verifyNoMoreInteractions(ctxMock, snsClientMock);
}
@Test
void givenSnsClientIsNotNull_whenDestroy_thenShutdown() {
node.destroy();
then(snsClientMock).should().shutdown();
}
@Test
void givenSnsClientIsNull_whenDestroy_thenVerifyNoInteractions() {
ReflectionTestUtils.setField(node, "snsClient", null);
node.destroy();
then(snsClientMock).shouldHaveNoInteractions();
}
}

View File

@ -0,0 +1,263 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.aws.sqs;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.aws.sqs.TbSqsNodeConfiguration.QueueType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.verifyNoMoreInteractions;
@ExtendWith(MockitoExtension.class)
class TbSqsNodeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("764de824-929f-4114-95ea-0ea0401ffa3d"));
private final ListeningExecutor executor = new TestDbCallbackExecutor();
private final String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7";
private final String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31";
private TbSqsNode node;
private TbSqsNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private AmazonSQS sqsClientMock;
@Mock
private SendMessageResult sendMessageResultMock;
@Mock
private ResponseMetadata responseMetadataMock;
@BeforeEach
void setUp() {
node = new TbSqsNode();
config = new TbSqsNodeConfiguration().defaultConfiguration();
ReflectionTestUtils.setField(node, "sqsClient", sqsClientMock);
ReflectionTestUtils.setField(node, "config", config);
}
@Test
void verifyDefaultConfig() {
assertThat(config.getQueueType()).isEqualTo(QueueType.STANDARD);
assertThat(config.getQueueUrlPattern()).isEqualTo("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue-name");
assertThat(config.getDelaySeconds()).isEqualTo(0);
assertThat(config.getMessageAttributes()).isEqualTo(Collections.emptyMap());
assertThat(config.getAccessKeyId()).isNull();
assertThat(config.getSecretAccessKey()).isNull();
assertThat(config.getRegion()).isEqualTo("us-east-1");
}
@ParameterizedTest
@MethodSource
void givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest(String queueUrl, TbMsgMetaData metaData, String data) {
config.setQueueType(QueueType.FIFO);
config.setQueueUrlPattern(queueUrl);
mockSendingMsgRequest();
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
node.onMsg(ctxMock, msg);
SendMessageRequest sendMsgRequest = new SendMessageRequest()
.withQueueUrl(TbNodeUtils.processPattern(queueUrl, msg))
.withMessageBody(data)
.withMessageDeduplicationId(msg.getId().toString())
.withMessageGroupId(DEVICE_ID.toString());
then(sqsClientMock).should().sendMessage(sendMsgRequest);
}
private static Stream<Arguments> givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest() {
return Stream.of(
Arguments.of(
"https://sqs.us-east-1.amazonaws.com/123456789012/new-queue-name",
TbMsgMetaData.EMPTY,
TbMsg.EMPTY_JSON_OBJECT),
Arguments.of(
"https://sqs.us-east-1.amazonaws.com/123456789012/$[msgQueueName]",
TbMsgMetaData.EMPTY,
"{\"msgQueueName\":\"msg-queue-name\"}"),
Arguments.of(
"https://sqs.us-east-1.amazonaws.com/123456789012/${mdQueueName}",
new TbMsgMetaData(Map.of("mdQueueName", "md-queue-name")),
TbMsg.EMPTY_JSON_OBJECT)
);
}
@ParameterizedTest
@MethodSource
void givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest(TbMsgMetaData metaData, String data,
Map<String, String> attributes) {
config.setMessageAttributes(attributes);
mockSendingMsgRequest();
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
node.onMsg(ctxMock, msg);
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
this.config.getMessageAttributes().forEach((k, v) -> {
String name = TbNodeUtils.processPattern(k, msg);
String val = TbNodeUtils.processPattern(v, msg);
messageAttributes.put(name, new MessageAttributeValue().withDataType("String").withStringValue(val));
});
SendMessageRequest sendMsgRequest = new SendMessageRequest()
.withQueueUrl(config.getQueueUrlPattern())
.withMessageBody(data)
.withMessageAttributes(messageAttributes)
.withDelaySeconds(config.getDelaySeconds());
then(sqsClientMock).should().sendMessage(sendMsgRequest);
}
private static Stream<Arguments> givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest() {
return Stream.of(
Arguments.of(TbMsgMetaData.EMPTY,
TbMsg.EMPTY_JSON_OBJECT,
Map.of("attributeName", "attributeValue")),
Arguments.of(TbMsgMetaData.EMPTY,
"{\"msgAttrNamePattern\":\"msgAttrName\",\"msgAttrValuePattern\":\"msgAttrValue\"}",
Map.of("$[msgAttrNamePattern]", "$[msgAttrValuePattern]")),
Arguments.of(new TbMsgMetaData(Map.of("mdAttrNamePattern", "mdAttrName", "mdAttrValuePattern", "mdAttrValue")),
TbMsg.EMPTY_JSON_OBJECT,
Map.of("${mdAttrNamePattern}", "${mdAttrValuePattern}"))
);
}
@Test
void givenForceAckIsTrueAndMsgResultContainsBodyAndAttributesAndNumber_whenOnMsg_thenEnqueueForTellNext() {
ReflectionTestUtils.setField(node, "forceAck", true);
String messageBodyMd5 = "msgBodyMd5-55fb8ba2-2b71-4673-a82a-969756764761";
String messageAttributesMd5 = "msgAttrMd5-e3ba3eef-52ae-436a-bec1-0c2c2252d1f1";
String sequenceNumber = "seqNum-bb5ddce0-cf4e-4295-b015-524bdb6a332f";
mockSendingMsgRequest();
given(sendMessageResultMock.getMD5OfMessageBody()).willReturn(messageBodyMd5);
given(sendMessageResultMock.getMD5OfMessageAttributes()).willReturn(messageAttributesMd5);
given(sendMessageResultMock.getSequenceNumber()).willReturn(sequenceNumber);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
then(ctxMock).should().ack(msg);
SendMessageRequest sendMsgRequest = new SendMessageRequest()
.withQueueUrl(TbNodeUtils.processPattern(config.getQueueUrlPattern(), msg))
.withMessageBody(msg.getData())
.withDelaySeconds(config.getDelaySeconds());
then(sqsClientMock).should().sendMessage(sendMsgRequest);
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().enqueueForTellNext(actualMsgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS));
TbMsg actualMsg = actualMsgCaptor.getValue();
assertThat(actualMsg)
.usingRecursiveComparison()
.ignoringFields("metaData", "ctx")
.isEqualTo(msg);
assertThat(actualMsg.getMetaData().getData())
.hasFieldOrPropertyWithValue("messageId", messageId)
.hasFieldOrPropertyWithValue("requestId", requestId)
.hasFieldOrPropertyWithValue("messageBodyMd5", messageBodyMd5)
.hasFieldOrPropertyWithValue("messageAttributesMd5", messageAttributesMd5)
.hasFieldOrPropertyWithValue("sequenceNumber", sequenceNumber);
verifyNoMoreInteractions(ctxMock, sqsClientMock, sendMessageResultMock, responseMetadataMock);
}
@Test
void givenForceAckIsFalseAndErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() {
ReflectionTestUtils.setField(node, "forceAck", false);
ListeningExecutor listeningExecutor = mock(ListeningExecutor.class);
given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor);
String errorMsg = "Something went wrong";
ListenableFuture<TbMsg> failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg));
given(listeningExecutor.executeAsync(any(Callable.class))).willReturn(failedFuture);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
then(ctxMock).should(never()).enqueueForTellNext(any(), any(String.class));
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), throwableCaptor.capture());
TbMsg actualMsg = actualMsgCaptor.getValue();
assertThat(actualMsg)
.usingRecursiveComparison()
.ignoringFields("metaData", "ctx")
.isEqualTo(msg);
assertThat(actualMsg.getMetaData().getData())
.hasFieldOrPropertyWithValue("error", RuntimeException.class + ": " + errorMsg);
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
verifyNoMoreInteractions(ctxMock, sqsClientMock);
}
@Test
void givenSqsClientIsNotNull_whenDestroy_thenShutdown() {
node.destroy();
then(sqsClientMock).should().shutdown();
}
@Test
void givenSqsClientIsNull_whenDestroy_thenVerifyNoInteractions() {
ReflectionTestUtils.setField(node, "sqsClient", null);
node.destroy();
then(sqsClientMock).shouldHaveNoInteractions();
}
private void mockSendingMsgRequest() {
given(ctxMock.getExternalCallExecutor()).willReturn(executor);
given(sqsClientMock.sendMessage(any(SendMessageRequest.class))).willReturn(sendMessageResultMock);
given(sendMessageResultMock.getMessageId()).willReturn(messageId);
given(sendMessageResultMock.getSdkResponseMetadata()).willReturn(responseMetadataMock);
given(responseMetadataMock.getRequestId()).willReturn(requestId);
}
}