removed ScheduledFuture from ToDeviceRpcRequestMetadata and updated logic in DeviceActorMessageProcessor
This commit is contained in:
parent
d46bd36c87
commit
ad912e1d28
@ -136,6 +136,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
private String deviceType;
|
||||
private TbMsgMetaData defaultMetaData;
|
||||
private EdgeId edgeId;
|
||||
private ScheduledFuture<?> awaitRpcResponseFuture;
|
||||
|
||||
DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
|
||||
super(systemContext);
|
||||
@ -311,9 +312,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
}
|
||||
Integer requestId = entry.getKey();
|
||||
if (entry.getValue().isDelivered()) {
|
||||
var md = toDeviceRpcPendingMap.remove(requestId);
|
||||
toDeviceRpcPendingMap.remove(requestId);
|
||||
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
|
||||
clearAwaitRpcResponseScheduler(md);
|
||||
clearAwaitRpcResponseScheduler();
|
||||
sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!");
|
||||
}
|
||||
return;
|
||||
@ -355,7 +356,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
return;
|
||||
}
|
||||
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
|
||||
clearAwaitRpcResponseScheduler(requestMd);
|
||||
clearAwaitRpcResponseScheduler();
|
||||
sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!");
|
||||
}
|
||||
}
|
||||
@ -390,9 +391,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
.findFirst().filter(entry -> {
|
||||
var md = entry.getValue();
|
||||
if (md.isDelivered()) {
|
||||
if (md.getAwaitRpcResponseFuture() == null || md.getAwaitRpcResponseFuture().isCancelled()) {
|
||||
if (awaitRpcResponseFuture == null || awaitRpcResponseFuture.isCancelled()) {
|
||||
var toDeviceRpcRequest = md.getMsg().getMsg();
|
||||
scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey());
|
||||
awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@ -649,7 +650,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
}
|
||||
} finally {
|
||||
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
|
||||
clearAwaitRpcResponseScheduler(requestMd);
|
||||
clearAwaitRpcResponseScheduler();
|
||||
String errorResponse = hasError ? "error " : "";
|
||||
String rpcState = delivered ? "" : "undelivered ";
|
||||
sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState));
|
||||
@ -685,7 +686,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
} else {
|
||||
md.setDelivered(true);
|
||||
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
|
||||
md.setAwaitRpcResponseFuture(scheduleAwaitRpcResponseFuture(rpcId, requestId));
|
||||
awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(rpcId, requestId);
|
||||
}
|
||||
}
|
||||
} else if (status.equals(RpcStatus.TIMEOUT)) {
|
||||
@ -740,7 +741,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
if (subscribeCmd.getUnsubscribe()) {
|
||||
log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId);
|
||||
rpcSubscriptions.remove(sessionId);
|
||||
clearAwaitRpcResponseSchedulers();
|
||||
clearAwaitRpcResponseScheduler();
|
||||
return;
|
||||
}
|
||||
SessionInfoMetaData sessionMD = sessions.get(sessionId);
|
||||
@ -775,7 +776,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
sessions.remove(sessionId);
|
||||
attributeSubscriptions.remove(sessionId);
|
||||
rpcSubscriptions.remove(sessionId);
|
||||
clearAwaitRpcResponseSchedulers();
|
||||
clearAwaitRpcResponseScheduler();
|
||||
if (sessions.isEmpty()) {
|
||||
reportSessionClose();
|
||||
}
|
||||
@ -798,19 +799,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
||||
}, systemContext.getRpcResponseTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private void clearAwaitRpcResponseSchedulers() {
|
||||
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
|
||||
toDeviceRpcPendingMap.forEach((integer, md) -> clearAwaitRpcResponseScheduler(md));
|
||||
}
|
||||
}
|
||||
|
||||
private void clearAwaitRpcResponseScheduler(ToDeviceRpcRequestMetadata md) {
|
||||
var awaitRpcResponseFuture = md.getAwaitRpcResponseFuture();
|
||||
if (awaitRpcResponseFuture == null) {
|
||||
return;
|
||||
}
|
||||
private void clearAwaitRpcResponseScheduler() {
|
||||
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) && awaitRpcResponseFuture != null) {
|
||||
awaitRpcResponseFuture.cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleSessionActivity(SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) {
|
||||
UUID sessionId = getSessionId(sessionInfoProto);
|
||||
|
||||
@ -18,8 +18,6 @@ package org.thingsboard.server.actors.device;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
|
||||
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
@ -29,5 +27,4 @@ public class ToDeviceRpcRequestMetadata {
|
||||
private final boolean sent;
|
||||
private int retries;
|
||||
private boolean delivered;
|
||||
private ScheduledFuture<?> awaitRpcResponseFuture;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user