From d49bee4b31e6caf902e94d4082621456e96def29 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 18 Aug 2021 19:27:48 +0300 Subject: [PATCH] send next rpc after removing --- .../server/actors/ActorSystemContext.java | 6 +-- .../device/DeviceActorMessageProcessor.java | 43 +++++++++++-------- .../controller/AbstractRpcController.java | 2 + .../rpc/DefaultTbCoreDeviceRpcService.java | 5 +++ .../rpc/DefaultTbRuleEngineRpcService.java | 2 +- .../server/common/data/DataConstants.java | 1 + .../common/msg/rpc/ToDeviceRpcRequest.java | 1 + .../api/RuleEngineDeviceRpcRequest.java | 2 +- .../rule/engine/rpc/TbSendRPCRequestNode.java | 4 ++ 9 files changed, 44 insertions(+), 22 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index c2b5b863fe..88c1e02507 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -400,13 +400,13 @@ public class ActorSystemContext { @Getter private String debugPerTenantLimitsConfiguration; - @Value("${actors.rpc.sequence.enabled:true}") + @Value("${actors.rpc.sequence.enabled:false}") @Getter private boolean rpcSequenceEnabled; - @Value("${actors.rpc.persistent.retries:5}") + @Value("${actors.rpc.max_retries:5}") @Getter - private int maxPersistentRpcRetries; + private int maxRpcRetries; @Getter @Setter diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 22e4aae78b..f5841acb7f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -103,6 +103,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; @@ -232,15 +233,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } private boolean isSendNewRpcAvailable() { - if (rpcSequenceEnabled) { - for (ToDeviceRpcRequestMetadata rpc : toDeviceRpcPendingMap.values()) { - if (!rpc.isDelivered()) { - return false; - } - } - } - - return true; + return !rpcSequenceEnabled || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); } private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { @@ -282,16 +275,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) { log.debug("[{}] Processing remove rpc command", msg.getRequestId()); - Integer requestId = null; - for (Map.Entry entry : toDeviceRpcPendingMap.entrySet()) { - if (entry.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) { - requestId = entry.getKey(); + Map.Entry entry = null; + for (Map.Entry e : toDeviceRpcPendingMap.entrySet()) { + if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) { + entry = e; break; } } - if (requestId != null) { - toDeviceRpcPendingMap.remove(requestId); + if (entry != null) { + if (entry.getValue().isDelivered()) { + toDeviceRpcPendingMap.remove(entry.getKey()); + } else { + Optional> firstRpc = getFirstRpc(); + if (firstRpc.isPresent() && entry.getKey().equals(firstRpc.get().getKey())) { + toDeviceRpcPendingMap.remove(entry.getKey()); + sendNextPendingRequest(context); + } else { + toDeviceRpcPendingMap.remove(entry.getKey()); + } + } } } @@ -330,7 +333,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { Set sentOneWayIds = new HashSet<>(); if (rpcSequenceEnabled) { - toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); + getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); } else if (sessionType == SessionType.ASYNC) { toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); } else { @@ -340,6 +343,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove); } + private Optional> getFirstRpc() { + return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst(); + } + private void sendNextPendingRequest(TbActorCtx context) { if (rpcSequenceEnabled) { rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId())); @@ -599,7 +606,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { md.setDelivered(true); } } else if (status.equals(RpcStatus.TIMEOUT)) { - if (systemContext.getMaxPersistentRpcRetries() <= md.getRetries()) { + Integer maxRpcRetries = md.getMsg().getMsg().getRetries(); + maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); + if (maxRpcRetries <= md.getRetries()) { toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); status = RpcStatus.FAILED; } else { diff --git a/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java b/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java index b7dbd8b3d9..98294b241a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java @@ -80,6 +80,7 @@ public abstract class AbstractRpcController extends BaseController { UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean(); String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO)); + Integer retries = rpcRequestBody.has(DataConstants.RETRIES) ? rpcRequestBody.get(DataConstants.RETRIES).asInt() : null; accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() { @Override public void onSuccess(@Nullable DeferredResult result) { @@ -90,6 +91,7 @@ public abstract class AbstractRpcController extends BaseController { expTime, body, persisted, + retries, additionalInfo ); deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus), currentUser); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 46e49fcdc8..2b2ade02ee 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -166,6 +166,11 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { metaData.putValue("oneway", Boolean.toString(msg.isOneway())); metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(msg.isPersisted())); + if (msg.getRetries() != null) { + metaData.putValue(DataConstants.RETRIES, msg.getRetries().toString()); + } + + Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId()); if (device != null) { metaData.putValue("deviceName", device.getName()); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 230f6e759e..eca8864fda 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @Override public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer consumer) { ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo()); + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getRetries(), src.getAdditionalInfo()); forwardRpcRequestToDeviceActor(request, response -> { if (src.isRestApiCall()) { sendRpcResponseToTbCore(src.getOriginServiceId(), response); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 54ecc31f83..707a50dfb3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -39,6 +39,7 @@ public class DataConstants { public static final String TIMEOUT = "timeout"; public static final String EXPIRATION_TIME = "expirationTime"; public static final String ADDITIONAL_INFO = "additionalInfo"; + public static final String RETRIES = "retries"; public static final String COAP_TRANSPORT_NAME = "COAP"; public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; public static final String MQTT_TRANSPORT_NAME = "MQTT"; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java index f9bb2b3810..f68f2cfdb8 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java @@ -36,6 +36,7 @@ public class ToDeviceRpcRequest implements Serializable { private final long expirationTime; private final ToDeviceRpcRequestBody body; private final boolean persisted; + private final Integer retries; @JsonIgnore private final String additionalInfo; } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java index afdee27b74..903cb291c2 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java @@ -41,5 +41,5 @@ public final class RuleEngineDeviceRpcRequest { private final long expirationTime; private final boolean restApiCall; private final String additionalInfo; - + private final Integer retries; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 5b40f41857..1d0b727e5e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -92,6 +92,9 @@ public class TbSendRPCRequestNode implements TbNode { tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME); long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())); + tmp = msg.getMetaData().getValue(DataConstants.RETRIES); + Integer retries = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : null; + String params; JsonElement paramsEl = json.get("params"); if (paramsEl.isJsonPrimitive()) { @@ -112,6 +115,7 @@ public class TbSendRPCRequestNode implements TbNode { .requestUUID(requestUUID) .originServiceId(originServiceId) .expirationTime(expirationTime) + .retries(retries) .restApiCall(restApiCall) .persisted(persisted) .additionalInfo(additionalInfo)