diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 615600a57f..7567130a7d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -136,6 +136,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private String deviceType; private TbMsgMetaData defaultMetaData; private EdgeId edgeId; + private ScheduledFuture awaitRpcResponseFuture; DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) { super(systemContext); @@ -311,9 +312,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } Integer requestId = entry.getKey(); if (entry.getValue().isDelivered()) { - var md = toDeviceRpcPendingMap.remove(requestId); + toDeviceRpcPendingMap.remove(requestId); if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(md); + clearAwaitRpcResponseScheduler(); sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); } return; @@ -355,7 +356,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso return; } if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(requestMd); + clearAwaitRpcResponseScheduler(); sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); } } @@ -390,9 +391,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso .findFirst().filter(entry -> { var md = entry.getValue(); if (md.isDelivered()) { - if (md.getAwaitRpcResponseFuture() == null || md.getAwaitRpcResponseFuture().isCancelled()) { + if (awaitRpcResponseFuture == null || awaitRpcResponseFuture.isCancelled()) { var toDeviceRpcRequest = md.getMsg().getMsg(); - scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey()); + awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey()); } return false; } @@ -649,7 +650,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } finally { if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(requestMd); + clearAwaitRpcResponseScheduler(); String errorResponse = hasError ? "error " : ""; String rpcState = delivered ? "" : "undelivered "; sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState)); @@ -685,7 +686,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } else { md.setDelivered(true); if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - md.setAwaitRpcResponseFuture(scheduleAwaitRpcResponseFuture(rpcId, requestId)); + awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(rpcId, requestId); } } } else if (status.equals(RpcStatus.TIMEOUT)) { @@ -740,7 +741,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); - clearAwaitRpcResponseSchedulers(); + clearAwaitRpcResponseScheduler(); return; } SessionInfoMetaData sessionMD = sessions.get(sessionId); @@ -775,7 +776,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso sessions.remove(sessionId); attributeSubscriptions.remove(sessionId); rpcSubscriptions.remove(sessionId); - clearAwaitRpcResponseSchedulers(); + clearAwaitRpcResponseScheduler(); if (sessions.isEmpty()) { reportSessionClose(); } @@ -798,20 +799,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso }, systemContext.getRpcResponseTimeout(), TimeUnit.MILLISECONDS); } - private void clearAwaitRpcResponseSchedulers() { - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - toDeviceRpcPendingMap.forEach((integer, md) -> clearAwaitRpcResponseScheduler(md)); + private void clearAwaitRpcResponseScheduler() { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) && awaitRpcResponseFuture != null) { + awaitRpcResponseFuture.cancel(true); } } - private void clearAwaitRpcResponseScheduler(ToDeviceRpcRequestMetadata md) { - var awaitRpcResponseFuture = md.getAwaitRpcResponseFuture(); - if (awaitRpcResponseFuture == null) { - return; - } - awaitRpcResponseFuture.cancel(true); - } - private void handleSessionActivity(SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) { UUID sessionId = getSessionId(sessionInfoProto); Objects.requireNonNull(sessionId); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java index b4f6795af7..f876408d24 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java @@ -18,8 +18,6 @@ package org.thingsboard.server.actors.device; import lombok.Data; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; -import java.util.concurrent.ScheduledFuture; - /** * @author Andrew Shvayka */ @@ -29,5 +27,4 @@ public class ToDeviceRpcRequestMetadata { private final boolean sent; private int retries; private boolean delivered; - private ScheduledFuture awaitRpcResponseFuture; }