diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 794528a2b8..71d3d43e00 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -183,7 +183,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) { ToDeviceRpcRequest request = msg.getMsg(); UUID rpcId = request.getId(); - log.debug("[{}][{}] Received rpc request to process ...", deviceId, rpcId); + log.debug("[{}][{}] Received RPC request to process ...", deviceId, rpcId); ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request); long timeout = request.getExpirationTime() - System.currentTimeMillis(); @@ -202,18 +202,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso boolean sent = false; int requestId = rpcRequest.getRequestId(); if (systemContext.isEdgesEnabled() && edgeId != null) { - log.debug("[{}][{}] device is related to edge: [{}]. Saving rpc request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId); + log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId); try { saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()).get(); sent = true; } catch (InterruptedException | ExecutionException e) { - log.error("[{}][{}][{}] Failed to save rpc request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e); + log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e); } } else if (isSendNewRpcAvailable()) { sent = rpcSubscriptions.size() > 0; Set syncSessionSet = new HashSet<>(); rpcSubscriptions.forEach((sessionId, sessionInfo) -> { - log.debug("[{}][{}][{}][{}] send rpc request to transport ...", deviceId, sessionId, rpcId, requestId); + log.debug("[{}][{}][{}][{}] send RPC request to transport ...", deviceId, sessionId, rpcId, requestId); sendToTransport(rpcRequest, sessionId, sessionInfo.getNodeId()); if (SessionType.SYNC == sessionInfo.getType()) { syncSessionSet.add(sessionId); @@ -230,15 +230,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } if (!persisted && request.isOneway() && sent) { - log.debug("[{}] Rpc command response sent [{}][{}]!", deviceId, rpcId, requestId); + log.debug("[{}] RPC command response sent [{}][{}]!", deviceId, rpcId, requestId); systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); } else { registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout); } if (sent) { - log.debug("[{}][{}][{}] Rpc request is sent!", deviceId, rpcId, requestId); + log.debug("[{}][{}][{}] RPC request is sent!", deviceId, rpcId, requestId); } else { - log.debug("[{}][{}][{}] Rpc request is NOT sent!", deviceId, rpcId, requestId); + log.debug("[{}][{}][{}] RPC request is NOT sent!", deviceId, rpcId, requestId); } } @@ -277,19 +277,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) { - log.debug("[{}] Processing rpc command response from edge session", deviceId); + log.debug("[{}] Processing RPC command response from edge session", deviceId); ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); boolean success = requestMd != null; if (success) { systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(responseMsg.getMsg()); } else { - log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); + log.debug("[{}] RPC command response [{}] is stale!", deviceId, responseMsg.getRequestId()); } } void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) { UUID requestId = msg.getRequestId(); - log.debug("[{}][{}] Received remove rpc request ...", deviceId, requestId); + log.debug("[{}][{}] Received remove RPC request ...", deviceId, requestId); Map.Entry entry = null; for (Map.Entry e : toDeviceRpcPendingMap.entrySet()) { if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) { @@ -306,7 +306,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso Optional> firstRpc = getFirstRpc(); if (firstRpc.isPresent() && key.equals(firstRpc.get().getKey())) { toDeviceRpcPendingMap.remove(key); - log.debug("[{}][{}][{}] Removed pending rpc! Going to send next pending request ...", deviceId, requestId, key); + log.debug("[{}][{}][{}] Removed pending RPC! Going to send next pending request ...", deviceId, requestId, key); sendNextPendingRequest(context); } else { toDeviceRpcPendingMap.remove(key); @@ -316,7 +316,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { - log.debug("[{}][{}][{}] Registering pending rpc request...", deviceId, getRpcIdFromRequest(rpcRequest), rpcRequest.getRequestId()); + log.debug("[{}][{}][{}] Registering pending RPC request...", deviceId, getRpcIdFromRequest(rpcRequest), rpcRequest.getRequestId()); toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent)); DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout); scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout()); @@ -327,14 +327,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(requestId); if (requestMd != null) { UUID rpcId = requestMd.getMsg().getMsg().getId(); - log.debug("[{}][{}][{}] Rpc request timeout detected!", deviceId, rpcId, requestId); + log.debug("[{}][{}][{}] RPC request timeout detected!", deviceId, rpcId, requestId); if (requestMd.getMsg().getMsg().isPersisted()) { systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.EXPIRED, null); } systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); if (!requestMd.isDelivered()) { - log.debug("[{}][{}][{}] Pending rpc timeout detected! Going to send next pending request ...", deviceId, rpcId, requestId); + log.debug("[{}][{}][{}] Pending RPC timeout detected! Going to send next pending request ...", deviceId, rpcId, requestId); sendNextPendingRequest(context); } } @@ -343,13 +343,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) { SessionType sessionType = getSessionType(sessionId); if (!toDeviceRpcPendingMap.isEmpty()) { - log.debug("[{}][{}] Pushing {} pending rpc messages to new async session!", deviceId, sessionId, toDeviceRpcPendingMap.size()); + log.debug("[{}][{}] Pushing {} pending RPC messages to new async session!", deviceId, sessionId, toDeviceRpcPendingMap.size()); if (sessionType == SessionType.SYNC) { - log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId); + log.debug("[{}] Cleanup sync RPC session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } } else { - log.debug("[{}] No pending rpc messages for new async session [{}]", deviceId, sessionId); + log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); } Set sentOneWayIds = new HashSet<>(); @@ -393,7 +393,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso .setOneway(request.isOneway()) .setPersisted(request.isPersisted()) .build(); - log.debug("[{}][{}][{}][{}] Send pending rpc request to transport ...", deviceId, sessionId, getRpcIdFromRequest(rpcRequest), requestId); + log.debug("[{}][{}][{}][{}] Send pending RPC request to transport ...", deviceId, sessionId, getRpcIdFromRequest(rpcRequest), requestId); sendToTransport(rpcRequest, sessionId, nodeId); }; } @@ -586,10 +586,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private void processRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) { UUID sessionId = getSessionId(sessionInfo); - log.debug("[{}][{}] Processing rpc command response: {}", deviceId, sessionId, responseMsg); + log.debug("[{}][{}] Processing RPC command response: {}", deviceId, sessionId, responseMsg); ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); boolean success = requestMd != null; if (success) { + ToDeviceRpcRequest toDeviceRequestMsg = requestMd.getMsg().getMsg(); boolean delivered = requestMd.isDelivered(); boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); try { @@ -599,12 +600,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } else if (delivered) { payload = responseMsg.getPayload(); } else { - payload = "Received response for undelivered rpc: " + responseMsg.getPayload(); + payload = "Received response for undelivered RPC: " + responseMsg.getPayload(); } systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor( - new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), + new FromDeviceRpcResponse(toDeviceRequestMsg.getId(), payload, null)); - if (requestMd.getMsg().getMsg().isPersisted()) { + if (toDeviceRequestMsg.isPersisted()) { RpcStatus status = hasError || !delivered ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL; JsonNode response; try { @@ -612,17 +613,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } catch (IllegalArgumentException e) { response = JacksonUtil.newObjectNode().put("error", payload); } - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response); + systemContext.getTbRpcService().save(tenantId, new RpcId(toDeviceRequestMsg.getId()), status, response); } } finally { if (!delivered) { String errorResponse = hasError ? "error" : ""; - log.debug("[{}][{}][{}] Received {} response for undelivered rpc! Going to send next pending request ...", deviceId, sessionId, responseMsg.getRequestId(), errorResponse); + log.debug("[{}][{}][{}] Received {} response for undelivered RPC! Going to send next pending request ...", deviceId, sessionId, responseMsg.getRequestId(), errorResponse); sendNextPendingRequest(context); } } } else { - log.debug("[{}][{}][{}] Rpc command response is stale!", deviceId, sessionId, responseMsg.getRequestId()); + log.debug("[{}][{}][{}] RPC command response is stale!", deviceId, sessionId, responseMsg.getRequestId()); } } @@ -631,7 +632,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus()); UUID sessionId = getSessionId(sessionInfo); int requestId = responseMsg.getRequestId(); - log.debug("[{}][{}][{}][{}] Processing rpc command response status: [{}]", deviceId, sessionId, rpcId, requestId, status); + log.debug("[{}][{}][{}][{}] Processing RPC command response status: [{}]", deviceId, sessionId, rpcId, requestId, status); ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(requestId); if (md != null) { @@ -661,11 +662,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); } if (status != RpcStatus.SENT) { - log.debug("[{}][{}][{}][{}] Rpc was {}! Going to send next pending request ...", deviceId, sessionId, rpcId, requestId, status.name().toLowerCase()); + log.debug("[{}][{}][{}][{}] RPC was {}! Going to send next pending request ...", deviceId, sessionId, rpcId, requestId, status.name().toLowerCase()); sendNextPendingRequest(context); } } else { - log.warn("[{}][{}][{}][{}] Rpc has already been removed from pending map.", deviceId, sessionId, rpcId, requestId); + log.warn("[{}][{}][{}][{}] RPC has already been removed from pending map.", deviceId, sessionId, rpcId, requestId); } } @@ -693,7 +694,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) { UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { - log.debug("[{}] Canceling rpc subscription for session: [{}]", deviceId, sessionId); + log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } else { SessionInfoMetaData sessionMD = sessions.get(sessionId); @@ -702,7 +703,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } sessionMD.setSubscribedToRPC(true); rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); - log.debug("[{}] Registered rpc subscription for session: [{}] Going to check for pending requests ...", deviceId, sessionId); + log.debug("[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ...", deviceId, sessionId); sendPendingRequests(context, sessionId, sessionInfo.getNodeId()); dumpSessions(); } @@ -945,14 +946,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } log.debug("[{}] Restored session: {}", deviceId, sessionMD); } - log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); + log.debug("[{}] Restored sessions: {}, RPC subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); } private void dumpSessions() { if (systemContext.isLocalCacheType()) { return; } - log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); + log.debug("[{}] Dumping sessions: {}, RPC subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); List sessionsList = new ArrayList<>(sessions.size()); sessions.forEach((uuid, sessionMD) -> { if (sessionMD.getSessionInfo().getType() == SessionType.SYNC) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index efa5aee965..621504564f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -207,7 +207,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void closeCtx(ChannelHandlerContext ctx) { if (!rpcAwaitingAck.isEmpty()) { - log.debug("[{}] Cleanup rpc awaiting ack map due to session close!", sessionId); + log.debug("[{}] Cleanup RPC awaiting ack map due to session close!", sessionId); rpcAwaitingAck.clear(); } ctx.close(); @@ -1229,7 +1229,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { - log.trace("[{}] Received RPC command to device", sessionId); + log.trace("[{}][{}] Received RPC command to device: {}", deviceSessionCtx.getDeviceId(), sessionId, rpcRequest); try { if (sparkplugSessionHandler != null) { handleToSparkplugDeviceRpcRequest(rpcRequest); @@ -1240,7 +1240,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo())); } } catch (Exception e) { - log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); + log.trace("[{}][{}] Failed to convert device RPC command to MQTT msg", deviceSessionCtx.getDeviceId(), sessionId, e); this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), ThingsboardErrorCode.INVALID_ARGUMENTS, "Failed to convert device RPC command to MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams()); @@ -1284,19 +1284,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } var cf = publish(payload, deviceSessionCtx); cf.addListener(result -> { - if (result.cause() == null) { - if (!isAckExpected(payload)) { - transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); - } else if (rpcRequest.getPersisted()) { - transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); - } - if (sparkplugSessionHandler != null) { - this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); - } - } else { - log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName()); + Throwable throwable = result.cause(); + if (throwable != null) { + log.trace("[{}][{}][{}] Failed send RPC request to device due to: ", deviceSessionCtx.getDeviceId(), sessionId, rpcRequest.getRequestId(), throwable); this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ThingsboardErrorCode.INVALID_ARGUMENTS, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName()); + return; + } + if (!isAckExpected(payload)) { + transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + } else if (rpcRequest.getPersisted()) { + transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); + } + if (sparkplugSessionHandler != null) { + this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); } }); }