Merge pull request #10549 from irynamatveieva/improvements/send-rpc-request-reply-nodes

Send rpc request and send rpc reply nodes: added tests
This commit is contained in:
Viacheslav Klimov 2024-06-12 16:34:21 +03:00 committed by GitHub
commit a17973b70e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 495 additions and 13 deletions

View File

@ -19,8 +19,12 @@ import com.google.common.util.concurrent.SettableFuture;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.common.util.ListeningExecutor;
@ -29,7 +33,10 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
@ -37,24 +44,30 @@ import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.edge.EdgeEventService;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class TbSendRPCReplyNodeTest { public class TbSendRPCReplyNodeTest {
private static final String DUMMY_SERVICE_ID = "testServiceId"; private static final String DUMMY_SERVICE_ID = "testServiceId";
private static final int DUMMY_REQUEST_ID = 0; private static final int DUMMY_REQUEST_ID = 0;
private static final UUID DUMMY_SESSION_ID = UUID.randomUUID(); private static final UUID DUMMY_SESSION_ID = UUID.fromString("4f1d94aa-f6ee-4078-8499-b8e68443f8ad");
private static final String DUMMY_DATA = "{\"key\":\"value\"}"; private final String DUMMY_DATA = "{\"key\":\"value\"}";
TbSendRPCReplyNode node; private TbSendRPCReplyNode node;
private TbSendRpcReplyNodeConfiguration config;
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); private final TenantId tenantId = TenantId.fromUUID(UUID.fromString("4e2e2336-3376-4238-ba0a-c669b412ca66"));
private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); private final DeviceId deviceId = new DeviceId(UUID.fromString("af64d1b9-8635-47e1-8738-6389df7fe57e"));
@Mock @Mock
private TbContext ctx; private TbContext ctx;
@ -71,14 +84,13 @@ public class TbSendRPCReplyNodeTest {
@BeforeEach @BeforeEach
public void setUp() throws TbNodeException { public void setUp() throws TbNodeException {
node = new TbSendRPCReplyNode(); node = new TbSendRPCReplyNode();
TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration(); config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration();
node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
} }
@Test @Test
public void sendReplyToTransport() { public void sendReplyToTransport() {
Mockito.when(ctx.getRpcService()).thenReturn(rpcService); when(ctx.getRpcService()).thenReturn(rpcService);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, deviceId, getDefaultMetadata(), TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, deviceId, getDefaultMetadata(),
TbMsgDataType.JSON, DUMMY_DATA, null, null); TbMsgDataType.JSON, DUMMY_DATA, null, null);
@ -91,10 +103,10 @@ public class TbSendRPCReplyNodeTest {
@Test @Test
public void sendReplyToEdgeQueue() { public void sendReplyToEdgeQueue() {
Mockito.when(ctx.getTenantId()).thenReturn(tenantId); when(ctx.getTenantId()).thenReturn(tenantId);
Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService); when(ctx.getEdgeEventService()).thenReturn(edgeEventService);
Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create()); when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create());
Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(listeningExecutor); when(ctx.getDbCallbackExecutor()).thenReturn(listeningExecutor);
TbMsgMetaData defaultMetadata = getDefaultMetadata(); TbMsgMetaData defaultMetadata = getDefaultMetadata();
defaultMetadata.putValue(DataConstants.EDGE_ID, UUID.randomUUID().toString()); defaultMetadata.putValue(DataConstants.EDGE_ID, UUID.randomUUID().toString());
@ -108,6 +120,55 @@ public class TbSendRPCReplyNodeTest {
verify(rpcService, never()).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA); verify(rpcService, never()).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA);
} }
@ParameterizedTest
@EnumSource(EntityType.class)
public void testOriginatorEntityTypes(EntityType entityType) {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, "0f386739-210f-4e23-8739-23f84a172adc");
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, entityId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctx, msg);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(eq(msg), throwableCaptor.capture());
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class)
.hasMessage(EntityType.DEVICE != entityType ? "Message originator is not a device entity!"
: "Request id is not present in the metadata!");
}
@ParameterizedTest
@MethodSource
public void testForAvailabilityOfMetadataAndDataValues(TbMsgMetaData metaData, String errorMsg) {
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, deviceId, metaData, TbMsg.EMPTY_STRING);
node.onMsg(ctx, msg);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(eq(msg), throwableCaptor.capture());
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg);
}
@Test
public void verifyDefaultConfig() {
assertThat(config.getServiceIdMetaDataAttribute()).isEqualTo("serviceId");
assertThat(config.getSessionIdMetaDataAttribute()).isEqualTo("sessionId");
assertThat(config.getRequestIdMetaDataAttribute()).isEqualTo("requestId");
}
private static Stream<Arguments> testForAvailabilityOfMetadataAndDataValues() {
return Stream.of(
Arguments.of(TbMsgMetaData.EMPTY, "Request id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of(
"requestId", Integer.toString(DUMMY_REQUEST_ID))), "Service id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of(
"requestId", Integer.toString(DUMMY_REQUEST_ID),
"serviceId", DUMMY_SERVICE_ID)), "Session id is not present in the metadata!"),
Arguments.of(new TbMsgMetaData(Map.of(
"requestId", Integer.toString(DUMMY_REQUEST_ID),
"serviceId", DUMMY_SERVICE_ID, "sessionId",
DUMMY_SESSION_ID.toString())), "Request body is empty!")
);
}
private TbMsgMetaData getDefaultMetadata() { private TbMsgMetaData getDefaultMetadata() {
TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration(); TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration();
TbMsgMetaData metadata = new TbMsgMetaData(); TbMsgMetaData metadata = new TbMsgMetaData();

View File

@ -0,0 +1,421 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rpc;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
@ExtendWith(MockitoExtension.class)
public class TbSendRPCRequestNodeTest {
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d3a47f8b-d863-4c1f-b6f0-2c946b43f21c"));
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("b052ae59-b9b4-47e8-ac71-39e7124bbd66"));
private final String MSG_DATA = """
{
"method": "setGpio",
"params": {
"pin": "23",
"value": 1
},
"additionalInfo": "information"
}
""";
private TbSendRPCRequestNode node;
private TbSendRpcRequestNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private RuleEngineRpcService rpcServiceMock;
@BeforeEach
public void setUp() throws TbNodeException {
node = new TbSendRPCRequestNode();
config = new TbSendRpcRequestNodeConfiguration().defaultConfiguration();
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, configuration);
}
@Test
public void verifyDefaultConfig() {
assertThat(config.getTimeoutInSeconds()).isEqualTo(60);
}
@ParameterizedTest
@MethodSource
public void givenOneway_whenOnMsg_thenVerifyRequest(String mdKeyValue, boolean expectedResult) {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsgMetaData msgMetadata = new TbMsgMetaData();
msgMetadata.putValue("oneway", mdKeyValue);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, msgMetadata, MSG_DATA);
node.onMsg(ctxMock, msg);
var ruleEngineDeviceRpcRequestCaptor = captureRequest();
assertThat(ruleEngineDeviceRpcRequestCaptor.getValue().isOneway()).isEqualTo(expectedResult);
}
private static Stream<Arguments> givenOneway_whenOnMsg_thenVerifyRequest() {
return Stream.of(
Arguments.of("true", true),
Arguments.of("false", false),
Arguments.of(null, false),
Arguments.of("", false)
);
}
@Test
public void givenMsgBody_whenOnMsg_thenVerifyRequest() {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = ArgumentCaptor.forClass(RuleEngineDeviceRpcRequest.class);
then(rpcServiceMock).should().sendRpcRequestToDevice(requestCaptor.capture(), any(Consumer.class));
assertThat(requestCaptor.getValue())
.hasFieldOrPropertyWithValue("method", "setGpio")
.hasFieldOrPropertyWithValue("body", "{\"pin\":\"23\",\"value\":1}")
.hasFieldOrPropertyWithValue("deviceId", DEVICE_ID)
.hasFieldOrPropertyWithValue("tenantId", TENANT_ID)
.hasFieldOrPropertyWithValue("additionalInfo", "information");
}
@Test
public void givenRequestIdIsNotSet_whenOnMsg_thenVerifyRequest() {
Random randomMock = mock(Random.class);
given(randomMock.nextInt()).willReturn(123);
ReflectionTestUtils.setField(node, "random", randomMock);
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsg msg = TbMsg.newMsg(TbMsgType.TO_SERVER_RPC_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getRequestId()).isEqualTo(123);
}
@Test
public void givenRequestId_whenOnMsg_thenVerifyRequest() {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
String data = """
{
"method": "setGpio",
"params": {
"pin": "23",
"value": 1
},
"requestId": 12345
}
""";
TbMsg msg = TbMsg.newMsg(TbMsgType.TO_SERVER_RPC_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getRequestId()).isEqualTo(12345);
}
@Test
public void givenRequestUUID_whenOnMsg_thenVerifyRequest() {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
String requestUUID = "b795a241-5a30-48fb-92d5-46b864d47130";
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue("requestUUID", requestUUID);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getRequestUUID()).isEqualTo(UUID.fromString(requestUUID));
}
@ParameterizedTest
@NullAndEmptySource
public void givenInvalidRequestUUID_whenOnMsg_thenVerifyRequest(String requestUUID) {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue("requestUUID", requestUUID);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getRequestUUID()).isNotNull();
}
@Test
public void givenOriginServiceId_whenOnMsg_thenVerifyRequest() {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
String originServiceId = "service-id-123";
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue("originServiceId", originServiceId);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getOriginServiceId()).isEqualTo(originServiceId);
}
@ParameterizedTest
@NullAndEmptySource
public void givenInvalidOriginServiceId_whenOnMsg_thenVerifyRequest(String originServiceId) {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue("originServiceId", originServiceId);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getOriginServiceId()).isNull();
}
@Test
public void givenExpirationTime_whenOnMsg_thenVerifyRequest() {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
String expirationTime = "2000000000000";
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue(DataConstants.EXPIRATION_TIME, expirationTime);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getExpirationTime()).isEqualTo(Long.parseLong(expirationTime));
}
@ParameterizedTest
@NullAndEmptySource
public void givenInvalidExpirationTime_whenOnMsg_thenVerifyRequest(String expirationTime) {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue(DataConstants.EXPIRATION_TIME, expirationTime);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getExpirationTime()).isGreaterThan(System.currentTimeMillis());
}
@Test
public void givenRetries_whenOnMsg_thenVerifyRequest() {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
Integer retries = 3;
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue(DataConstants.RETRIES, String.valueOf(retries));
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getRetries()).isEqualTo(retries);
}
@ParameterizedTest
@NullAndEmptySource
public void givenInvalidRetriesValue_whenOnMsg_thenVerifyRequest(String retries) {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue(DataConstants.RETRIES, retries);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().getRetries()).isNull();
}
@ParameterizedTest
@EnumSource(TbMsgType.class)
public void givenTbMsgType_whenOnMsg_thenVerifyRequest(TbMsgType msgType) {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
if (msgType == TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE) {
assertThat(requestCaptor.getValue().isRestApiCall()).isTrue();
return;
}
assertThat(requestCaptor.getValue().isRestApiCall()).isFalse();
}
@ParameterizedTest
@MethodSource
public void givenPersistent_whenOnMsg_thenVerifyRequest(String isPersisted, boolean expectedPersistence) {
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue(DataConstants.PERSISTENT, isPersisted);
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA);
node.onMsg(ctxMock, msg);
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest();
assertThat(requestCaptor.getValue().isPersisted()).isEqualTo(expectedPersistence);
}
private static Stream<Arguments> givenPersistent_whenOnMsg_thenVerifyRequest() {
return Stream.of(
Arguments.of("true", true),
Arguments.of("false", false),
Arguments.of(null, false),
Arguments.of("", false)
);
}
private ArgumentCaptor<RuleEngineDeviceRpcRequest> captureRequest() {
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = ArgumentCaptor.forClass(RuleEngineDeviceRpcRequest.class);
then(rpcServiceMock).should().sendRpcRequestToDevice(requestCaptor.capture(), any(Consumer.class));
return requestCaptor;
}
@Test
public void givenRpcResponseWithoutError_whenOnMsg_thenSendsRpcRequest() {
TbMsg outMsg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
// TODO: replace deprecated method newMsg()
given(ctxMock.newMsg(any(), any(String.class), any(), any(), any(), any())).willReturn(outMsg);
willAnswer(invocation -> {
Consumer<RuleEngineDeviceRpcResponse> consumer = invocation.getArgument(1);
RuleEngineDeviceRpcResponse rpcResponseMock = mock(RuleEngineDeviceRpcResponse.class);
given(rpcResponseMock.getError()).willReturn(Optional.empty());
given(rpcResponseMock.getResponse()).willReturn(Optional.of(TbMsg.EMPTY_JSON_OBJECT));
consumer.accept(rpcResponseMock);
return null;
}).given(rpcServiceMock).sendRpcRequestToDevice(any(RuleEngineDeviceRpcRequest.class), any(Consumer.class));
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA);
node.onMsg(ctxMock, msg);
then(ctxMock).should().enqueueForTellNext(outMsg, TbNodeConnectionType.SUCCESS);
then(ctxMock).should().ack(msg);
}
@Test
public void givenRpcResponseWithError_whenOnMsg_thenTellFailure() {
TbMsg outMsg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
given(ctxMock.getRpcService()).willReturn(rpcServiceMock);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
// TODO: replace deprecated method newMsg()
given(ctxMock.newMsg(any(), any(String.class), any(), any(), any(), any())).willReturn(outMsg);
willAnswer(invocation -> {
Consumer<RuleEngineDeviceRpcResponse> consumer = invocation.getArgument(1);
RuleEngineDeviceRpcResponse rpcResponseMock = mock(RuleEngineDeviceRpcResponse.class);
given(rpcResponseMock.getError()).willReturn(Optional.of(RpcError.NO_ACTIVE_CONNECTION));
consumer.accept(rpcResponseMock);
return null;
}).given(rpcServiceMock).sendRpcRequestToDevice(any(RuleEngineDeviceRpcRequest.class), any(Consumer.class));
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA);
node.onMsg(ctxMock, msg);
then(ctxMock).should().enqueueForTellFailure(outMsg, RpcError.NO_ACTIVE_CONNECTION.name());
then(ctxMock).should().ack(msg);
}
@ParameterizedTest
@EnumSource(EntityType.class)
public void givenOriginatorIsNotDevice_whenOnMsg_thenThrowsException(EntityType entityType) {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, "ac21a1bb-eabf-4463-8313-24bea1f498d9");
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, entityId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(eq(msg), throwableCaptor.capture());
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class)
.hasMessage(EntityType.DEVICE != entityType ? "Message originator is not a device entity!"
: "Method is not present in the message!");
}
@ParameterizedTest
@ValueSource(strings = {"method", "params"})
public void givenMethodOrParamsAreNotPresent_whenOnMsg_thenThrowsException(String key) {
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"" + key + "\": \"value\"}");
node.onMsg(ctxMock, msg);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(eq(msg), throwableCaptor.capture());
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class)
.hasMessage(key.equals("method") ? "Params are not present in the message!" : "Method is not present in the message!");
}
}