diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java index 49438792fa..ebea5d482f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java @@ -125,23 +125,36 @@ public class DeviceMsgConstructor { } private DeviceRpcCallMsg.Builder constructDeviceRpcMsg(UUID deviceId, JsonNode body) { - int requestId = body.get("requestId").asInt(); - boolean oneway = body.get("oneway").asBoolean(); - UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); - long expirationTime = body.get("expirationTime").asLong(); - boolean persisted = body.get("persisted").asBoolean(); - int retries = body.get("retries").asInt(); - String additionalInfo = body.get("additionalInfo").asText(); - return DeviceRpcCallMsg.newBuilder() + DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder() .setDeviceIdMSB(deviceId.getMostSignificantBits()) .setDeviceIdLSB(deviceId.getLeastSignificantBits()) - .setRequestUuidMSB(requestUUID.getMostSignificantBits()) - .setRequestUuidLSB(requestUUID.getLeastSignificantBits()) - .setExpirationTime(expirationTime) - .setRequestId(requestId) - .setOneway(oneway) - .setPersisted(persisted) - .setRetries(retries) - .setAdditionalInfo(additionalInfo); + .setRequestId(body.get("requestId").asInt()); + if (body.get("oneway") != null) { + builder.setOneway(body.get("oneway").asBoolean()); + } + if (body.get("requestUUID") != null) { + UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); + builder.setRequestUuidMSB(requestUUID.getMostSignificantBits()) + .setRequestUuidLSB(requestUUID.getLeastSignificantBits()); + } + if (body.get("expirationTime") != null) { + builder.setExpirationTime(body.get("expirationTime").asLong()); + } + if (body.get("persisted") != null) { + builder.setPersisted(body.get("persisted").asBoolean()); + } + if (body.get("retries") != null) { + builder.setRetries(body.get("retries").asInt()); + } + if (body.get("additionalInfo") != null) { + builder.setAdditionalInfo(JacksonUtil.toString(body.get("additionalInfo"))); + } + if (body.get("serviceId") != null) { + builder.setServiceId(body.get("serviceId").asText()); + } + if (body.get("sessionId") != null) { + builder.setSessionId(body.get("sessionId").asText()); + } + return builder; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index b340b252ef..959f5c5fe8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -376,7 +376,8 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { String requestId = Integer.toString(deviceRpcCallMsg.getRequestId()); metaData.putValue("requestId", requestId); metaData.putValue("requestUUID", requestUUID.toString()); - // ?? metaData.putValue("originServiceId", deviceRpcRequestMsg.get); + metaData.putValue("serviceId", deviceRpcCallMsg.getServiceId()); + metaData.putValue("sessionId", deviceRpcCallMsg.getSessionId()); metaData.putValue("expirationTime", Long.toString(deviceRpcCallMsg.getExpirationTime())); metaData.putValue("oneway", Boolean.toString(deviceRpcCallMsg.getOneway())); metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(deviceRpcCallMsg.getPersisted())); @@ -403,16 +404,18 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device); + log.debug("Successfully send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}", + device, deviceRpcCallMsg); } @Override public void onFailure(Throwable t) { - log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", device, t); + log.debug("Failed to send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}", + device, deviceRpcCallMsg, t); } }); } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e); + log.warn("[{}] Failed to push TO_SERVER_RPC_REQUEST to rule engine. deviceRpcCallMsg {}", deviceId, deviceRpcCallMsg, e); } return Futures.immediateFuture(null); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index a5785d6870..d70179cfe0 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -433,6 +433,8 @@ message DeviceRpcCallMsg { bool persisted = 10; int32 retries = 11; string additionalInfo = 12; + string serviceId = 13; + string sessionId = 14; } message RpcRequestMsg { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java index 22d13f196b..89bc55bb54 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.rpc; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -78,7 +80,11 @@ public class TbSendRPCReplyNode implements TbNode { ctx.tellFailure(msg, new RuntimeException("Request body is empty!")); } else { if (StringUtils.isNotBlank(msg.getMetaData().getValue(DataConstants.EDGE_ID))) { - saveRpcResponseToEdgeQueue(ctx, msg); + try { + saveRpcResponseToEdgeQueue(ctx, msg, serviceIdStr, sessionIdStr, requestIdStr); + } catch (Exception e) { + ctx.tellFailure(msg, e); + } } else { ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData()); ctx.tellSuccess(msg); @@ -86,7 +92,7 @@ public class TbSendRPCReplyNode implements TbNode { } } - private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg) { + private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg, String serviceIdStr, String sessionIdStr, String requestIdStr) throws JsonProcessingException { // EdgeEvent edgeEvent = new EdgeEvent(); // edgeEvent.setTenantId(tenantId); // edgeEvent.setAction(eventAction); @@ -95,8 +101,11 @@ public class TbSendRPCReplyNode implements TbNode { // edgeEvent.setBody(entityBody); // edgeEvent.setEdgeId(edgeId); // -// ObjectNode body = mapper.createObjectNode(); -// body.put("requestId", requestId); + ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + body.put("serviceId", serviceIdStr); + body.put("sessionId", sessionIdStr); + body.put("requestId", requestIdStr); + body.put("response", JacksonUtil.OBJECT_MAPPER.writeValueAsString(msg.getData())); // body.put("requestUUID", msg.getId().toString()); // body.put("oneway", msg.isOneway()); // body.put("expirationTime", msg.getExpirationTime()); @@ -111,7 +120,7 @@ public class TbSendRPCReplyNode implements TbNode { // TODO: add body EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE, - EdgeEventActionType.RPC_CALL_RESPONSE, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree("{}")); + EdgeEventActionType.RPC_CALL_RESPONSE, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body)); ListenableFuture future = ctx.getEdgeEventService().saveAsync(edgeEvent); Futures.addCallback(future, new FutureCallback() { @@ -123,6 +132,7 @@ public class TbSendRPCReplyNode implements TbNode { @Override public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); } }, ctx.getDbCallbackExecutor()); }