Added serviceId and sessionId to deviceRpcCallmsg

This commit is contained in:
Volodymyr Babak 2022-11-09 12:54:17 +02:00
parent 545790fc5b
commit d1312cb917
4 changed files with 53 additions and 25 deletions

View File

@ -125,23 +125,36 @@ public class DeviceMsgConstructor {
}
private DeviceRpcCallMsg.Builder constructDeviceRpcMsg(UUID deviceId, JsonNode body) {
int requestId = body.get("requestId").asInt();
boolean oneway = body.get("oneway").asBoolean();
UUID requestUUID = UUID.fromString(body.get("requestUUID").asText());
long expirationTime = body.get("expirationTime").asLong();
boolean persisted = body.get("persisted").asBoolean();
int retries = body.get("retries").asInt();
String additionalInfo = body.get("additionalInfo").asText();
return DeviceRpcCallMsg.newBuilder()
DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder()
.setDeviceIdMSB(deviceId.getMostSignificantBits())
.setDeviceIdLSB(deviceId.getLeastSignificantBits())
.setRequestUuidMSB(requestUUID.getMostSignificantBits())
.setRequestUuidLSB(requestUUID.getLeastSignificantBits())
.setExpirationTime(expirationTime)
.setRequestId(requestId)
.setOneway(oneway)
.setPersisted(persisted)
.setRetries(retries)
.setAdditionalInfo(additionalInfo);
.setRequestId(body.get("requestId").asInt());
if (body.get("oneway") != null) {
builder.setOneway(body.get("oneway").asBoolean());
}
if (body.get("requestUUID") != null) {
UUID requestUUID = UUID.fromString(body.get("requestUUID").asText());
builder.setRequestUuidMSB(requestUUID.getMostSignificantBits())
.setRequestUuidLSB(requestUUID.getLeastSignificantBits());
}
if (body.get("expirationTime") != null) {
builder.setExpirationTime(body.get("expirationTime").asLong());
}
if (body.get("persisted") != null) {
builder.setPersisted(body.get("persisted").asBoolean());
}
if (body.get("retries") != null) {
builder.setRetries(body.get("retries").asInt());
}
if (body.get("additionalInfo") != null) {
builder.setAdditionalInfo(JacksonUtil.toString(body.get("additionalInfo")));
}
if (body.get("serviceId") != null) {
builder.setServiceId(body.get("serviceId").asText());
}
if (body.get("sessionId") != null) {
builder.setSessionId(body.get("sessionId").asText());
}
return builder;
}
}

View File

@ -376,7 +376,8 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
String requestId = Integer.toString(deviceRpcCallMsg.getRequestId());
metaData.putValue("requestId", requestId);
metaData.putValue("requestUUID", requestUUID.toString());
// ?? metaData.putValue("originServiceId", deviceRpcRequestMsg.get);
metaData.putValue("serviceId", deviceRpcCallMsg.getServiceId());
metaData.putValue("sessionId", deviceRpcCallMsg.getSessionId());
metaData.putValue("expirationTime", Long.toString(deviceRpcCallMsg.getExpirationTime()));
metaData.putValue("oneway", Boolean.toString(deviceRpcCallMsg.getOneway()));
metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(deviceRpcCallMsg.getPersisted()));
@ -403,16 +404,18 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device);
log.debug("Successfully send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}",
device, deviceRpcCallMsg);
}
@Override
public void onFailure(Throwable t) {
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", device, t);
log.debug("Failed to send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}",
device, deviceRpcCallMsg, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e);
log.warn("[{}] Failed to push TO_SERVER_RPC_REQUEST to rule engine. deviceRpcCallMsg {}", deviceId, deviceRpcCallMsg, e);
}
return Futures.immediateFuture(null);

View File

@ -433,6 +433,8 @@ message DeviceRpcCallMsg {
bool persisted = 10;
int32 retries = 11;
string additionalInfo = 12;
string serviceId = 13;
string sessionId = 14;
}
message RpcRequestMsg {

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.rule.engine.rpc;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -78,7 +80,11 @@ public class TbSendRPCReplyNode implements TbNode {
ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
} else {
if (StringUtils.isNotBlank(msg.getMetaData().getValue(DataConstants.EDGE_ID))) {
saveRpcResponseToEdgeQueue(ctx, msg);
try {
saveRpcResponseToEdgeQueue(ctx, msg, serviceIdStr, sessionIdStr, requestIdStr);
} catch (Exception e) {
ctx.tellFailure(msg, e);
}
} else {
ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData());
ctx.tellSuccess(msg);
@ -86,7 +92,7 @@ public class TbSendRPCReplyNode implements TbNode {
}
}
private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg) {
private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg, String serviceIdStr, String sessionIdStr, String requestIdStr) throws JsonProcessingException {
// EdgeEvent edgeEvent = new EdgeEvent();
// edgeEvent.setTenantId(tenantId);
// edgeEvent.setAction(eventAction);
@ -95,8 +101,11 @@ public class TbSendRPCReplyNode implements TbNode {
// edgeEvent.setBody(entityBody);
// edgeEvent.setEdgeId(edgeId);
//
// ObjectNode body = mapper.createObjectNode();
// body.put("requestId", requestId);
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode();
body.put("serviceId", serviceIdStr);
body.put("sessionId", sessionIdStr);
body.put("requestId", requestIdStr);
body.put("response", JacksonUtil.OBJECT_MAPPER.writeValueAsString(msg.getData()));
// body.put("requestUUID", msg.getId().toString());
// body.put("oneway", msg.isOneway());
// body.put("expirationTime", msg.getExpirationTime());
@ -111,7 +120,7 @@ public class TbSendRPCReplyNode implements TbNode {
// TODO: add body
EdgeEvent edgeEvent =
EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
EdgeEventActionType.RPC_CALL_RESPONSE, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree("{}"));
EdgeEventActionType.RPC_CALL_RESPONSE, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body));
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
Futures.addCallback(future, new FutureCallback<Void>() {
@ -123,6 +132,7 @@ public class TbSendRPCReplyNode implements TbNode {
@Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
}, ctx.getDbCallbackExecutor());
}