Added TbSendRPCReplyNodeTest

This commit is contained in:
Volodymyr Babak 2022-11-10 11:36:46 +02:00
parent 15b26f4317
commit d8a6093442
3 changed files with 134 additions and 2 deletions

View File

@ -511,6 +511,8 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
body.put("expirationTime", System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10));
body.put("method", "test_method");
body.put("params", "{\"param1\":\"value1\"}");
body.put("persisted", true);
body.put("retries", 2);
EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL,
device.getId().getId(), EdgeEventType.DEVICE, body);
@ -523,6 +525,8 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
Assert.assertTrue(latestMessage instanceof DeviceRpcCallMsg);
DeviceRpcCallMsg latestDeviceRpcCallMsg = (DeviceRpcCallMsg) latestMessage;
Assert.assertEquals("test_method", latestDeviceRpcCallMsg.getRequestMsg().getMethod());
Assert.assertTrue(latestDeviceRpcCallMsg.getPersisted());
Assert.assertEquals(2, latestDeviceRpcCallMsg.getRetries());
}
private void sendAttributesRequestAndVerify(Device device, String scope, String attributesDataStr, String expectedKey,

View File

@ -87,13 +87,22 @@ public class TbSendRPCReplyNode implements TbNode {
}
private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg, String serviceIdStr, String sessionIdStr, String requestIdStr) {
EdgeId edgeId;
DeviceId deviceId;
try {
edgeId = new EdgeId(UUID.fromString(msg.getMetaData().getValue(DataConstants.EDGE_ID)));
deviceId = new DeviceId(UUID.fromString(msg.getMetaData().getValue(DataConstants.DEVICE_ID)));
} catch (Exception e) {
String errMsg = String.format("[%s] Failed to parse edgeId or deviceId from metadata %s!", ctx.getTenantId(), msg.getMetaData());
ctx.tellFailure(msg, new RuntimeException(errMsg));
return;
}
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode();
body.put("serviceId", serviceIdStr);
body.put("sessionId", sessionIdStr);
body.put("requestId", requestIdStr);
body.put("response", msg.getData());
EdgeId edgeId = new EdgeId(UUID.fromString(msg.getMetaData().getValue(DataConstants.EDGE_ID)));
DeviceId deviceId = new DeviceId(UUID.fromString(msg.getMetaData().getValue(DataConstants.DEVICE_ID)));
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body));
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);

View File

@ -0,0 +1,119 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rpc;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.dao.edge.EdgeEventService;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class TbSendRPCReplyNodeTest {
private static final String DUMMY_SERVICE_ID = "testServiceId";
private static final int DUMMY_REQUEST_ID = 0;
private static final UUID DUMMY_SESSION_ID = UUID.randomUUID();
private static final String DUMMY_DATA = "{\"key\":\"value\"}";
TbSendRPCReplyNode node;
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
private final DeviceId deviceId = new DeviceId(UUID.randomUUID());
@Mock
private TbContext ctx;
@Mock
private RuleEngineRpcService rpcService;
@Mock
private EdgeEventService edgeEventService;
@Mock
private ListeningExecutor listeningExecutor;
@Before
public void setUp() throws TbNodeException {
node = new TbSendRPCReplyNode();
TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration();
node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
}
@Test
public void sendReplyToTransport() {
Mockito.when(ctx.getRpcService()).thenReturn(rpcService);
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, getDefaultMetadata(),
TbMsgDataType.JSON, DUMMY_DATA, null, null);
node.onMsg(ctx, msg);
verify(rpcService).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA);
verify(edgeEventService, never()).saveAsync(any());
}
@Test
public void sendReplyToEdgeQueue() {
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService);
Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create());
Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(listeningExecutor);
TbMsgMetaData defaultMetadata = getDefaultMetadata();
defaultMetadata.putValue(DataConstants.EDGE_ID, UUID.randomUUID().toString());
defaultMetadata.putValue(DataConstants.DEVICE_ID, UUID.randomUUID().toString());
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetadata,
TbMsgDataType.JSON, DUMMY_DATA, null, null);
node.onMsg(ctx, msg);
verify(edgeEventService).saveAsync(any());
verify(rpcService, never()).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA);
}
private TbMsgMetaData getDefaultMetadata() {
TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration();
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue(config.getServiceIdMetaDataAttribute(), DUMMY_SERVICE_ID);
metadata.putValue(config.getSessionIdMetaDataAttribute(), DUMMY_SESSION_ID.toString());
metadata.putValue(config.getRequestIdMetaDataAttribute(), Integer.toString(DUMMY_REQUEST_ID));
return metadata;
}
}