From 4c6cf6ec2a050b614ce78198311e70c4884b47c0 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 11 Aug 2021 17:46:54 +0300 Subject: [PATCH] added removing persistent rpc from pending map --- .../device/DeviceActorMessageProcessor.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 6224a3635d..9f48e3f577 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -26,7 +26,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.LinkedHashMapRemoveEldest; -import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg; @@ -52,6 +51,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.data.security.DeviceCredentials; @@ -59,6 +59,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.gen.transport.TransportProtos; @@ -86,7 +87,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; @@ -551,7 +551,20 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) { UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB()); - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.valueOf(responseMsg.getStatus()), null); + RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus()); + + ToDeviceRpcRequestMetadata md; + if (RpcStatus.DELIVERED.equals(status)) { + md = toDeviceRpcPendingMap.get(responseMsg.getRequestId()); + } else { + md = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); + } + + if (md != null) { + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null); + } else { + log.debug("[{}] Rpc has already removed from pending map.", rpcId); + } } private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {