From a44bc1ba93589e5df89a900adc8e63c9f026f4d4 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 22 Jun 2021 16:21:56 +0300 Subject: [PATCH] added SendPendingRPC transport msg --- .../device/DeviceActorMessageProcessor.java | 21 ++++++++++++------- common/queue/src/main/proto/queue.proto | 14 ++++++++----- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index a1458fd5ac..2d5f107d61 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -322,29 +322,34 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) { TransportToDeviceActorMsg msg = wrapper.getMsg(); TbCallback callback = wrapper.getCallback(); + var sessionInfo = msg.getSessionInfo(); + if (msg.hasSessionEvent()) { - processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); + processSessionStateMsgs(sessionInfo, msg.getSessionEvent()); } if (msg.hasSubscribeToAttributes()) { - processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes()); + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes()); } if (msg.hasSubscribeToRPC()) { - processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC()); + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC()); + } + if (msg.hasSendPendingRPC()) { + sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo); } if (msg.hasGetAttributes()) { - handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); + handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes()); } if (msg.hasToDeviceRPCCallResponse()) { - processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse()); + processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse()); } if (msg.hasSubscriptionInfo()) { - handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo()); + handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo()); } if (msg.hasClaimDevice()) { - handleClaimDeviceMsg(context, msg.getSessionInfo(), msg.getClaimDevice()); + handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice()); } if (msg.hasPersistedRpcResponseMsg()) { - processPersistedRpcResponses(context, msg.getSessionInfo(), msg.getPersistedRpcResponseMsg()); + processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg()); } callback.onSuccess(); } diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index a7edd42142..d47e6838d3 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -318,6 +318,9 @@ message SubscribeToRPCMsg { SessionType sessionType = 2; } +message SendPendingRPCMsg { +} + message ToDeviceRpcRequestMsg { int32 requestId = 1; string methodName = 2; @@ -440,11 +443,12 @@ message TransportToDeviceActorMsg { GetAttributeRequestMsg getAttributes = 3; SubscribeToAttributeUpdatesMsg subscribeToAttributes = 4; SubscribeToRPCMsg subscribeToRPC = 5; - ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 6; - SubscriptionInfoProto subscriptionInfo = 7; - ClaimDeviceMsg claimDevice = 8; - ProvisionDeviceRequestMsg provisionDevice = 9; - ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10; + SendPendingRPCMsg sendPendingRPC = 6; + ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 7; + SubscriptionInfoProto subscriptionInfo = 8; + ClaimDeviceMsg claimDevice = 9; + ProvisionDeviceRequestMsg provisionDevice = 10; + ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 11; } message TransportToRuleEngineMsg {