From c2ce40cd39b60e163ae0fb27624c7e19520586d9 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Wed, 2 Aug 2023 16:05:18 +0300 Subject: [PATCH 1/5] added rpc sequential strategies: init commit --- .../server/actors/ActorSystemContext.java | 9 +- .../device/DeviceActorMessageProcessor.java | 321 +++++++++++------- .../device/ToDeviceRpcRequestMetadata.java | 3 + .../server/controller/RpcV2Controller.java | 2 +- .../server/service/rpc/RpcSubmitStrategy.java | 30 ++ .../src/main/resources/thingsboard.yml | 6 +- .../server/controller/AbstractWebTest.java | 10 + .../service/rpc/RpcSubmitStrategyTest.java | 38 +++ .../mqtt/AbstractMqttIntegrationTest.java | 6 +- .../mqtt/mqttv3/MqttTestCallback.java | 5 +- .../MqttTestSubscribeOnTopicCallback.java | 2 +- ...AbstractMqttAttributesIntegrationTest.java | 10 +- ...tractMqttServerSideRpcIntegrationTest.java | 155 +++++++-- ...ttServerSideRpcDefaultIntegrationTest.java | 5 - ...erSideRpcSequenceOnAckIntegrationTest.java | 72 ++++ ...eRpcSequenceOnResponseIntegrationTest.java | 73 ++++ ...AbstractMqttTimeseriesIntegrationTest.java | 2 +- .../resources/application-test.properties | 2 +- .../server/common/data/rpc/RpcStatus.java | 20 +- .../server/common/data/rpc/RpcStatusTest.java | 47 +++ .../scheduler/DefaultSchedulerComponent.java | 4 +- 21 files changed, 644 insertions(+), 178 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/rpc/RpcSubmitStrategy.java create mode 100644 application/src/test/java/org/thingsboard/server/service/rpc/RpcSubmitStrategyTest.java create mode 100644 application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnAckIntegrationTest.java create mode 100644 application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnResponseIntegrationTest.java create mode 100644 common/data/src/test/java/org/thingsboard/server/common/data/rpc/RpcStatusTest.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index fc822ce226..1e223d19cf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -115,7 +115,6 @@ import org.thingsboard.server.service.transport.TbCoreToTransportService; import javax.annotation.Nullable; import javax.annotation.PostConstruct; -import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.concurrent.ConcurrentHashMap; @@ -528,9 +527,13 @@ public class ActorSystemContext { @Getter private String debugPerTenantLimitsConfiguration; - @Value("${actors.rpc.sequential:false}") + @Value("${actors.rpc.submit_strategy:BURST}") @Getter - private boolean rpcSequential; + private String rpcSubmitStrategy; + + @Value("${actors.rpc.response_timeout_ms:60000}") + @Getter + private long rpcResponseTimeout; @Value("${actors.rpc.max_retries:5}") @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 0ce0ae7884..615600a57f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -89,6 +89,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; +import org.thingsboard.server.service.rpc.RpcSubmitStrategy; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -106,6 +107,9 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -124,6 +128,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private final Map rpcSubscriptions; private final Map toDeviceRpcPendingMap; private final boolean rpcSequential; + private final RpcSubmitStrategy rpcSubmitStrategy; + private final ScheduledExecutorService scheduler; private int rpcSeq = 0; private String deviceName; @@ -135,11 +141,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso super(systemContext); this.tenantId = tenantId; this.deviceId = deviceId; - this.rpcSequential = systemContext.isRpcSequential(); + this.rpcSubmitStrategy = RpcSubmitStrategy.parse(systemContext.getRpcSubmitStrategy()); + this.rpcSequential = !rpcSubmitStrategy.equals(RpcSubmitStrategy.BURST); this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); this.toDeviceRpcPendingMap = new LinkedHashMap<>(); this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit); + this.scheduler = systemContext.getScheduler(); if (initAttributes()) { restoreSessions(); } @@ -225,24 +233,29 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (persisted) { ObjectNode response = JacksonUtil.newObjectNode(); response.put("rpcId", rpcId.toString()); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null)); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, JacksonUtil.toString(response), null)); } if (!persisted && request.isOneway() && sent) { log.debug("[{}] RPC command response sent [{}][{}]!", deviceId, rpcId, requestId); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null)); } else { registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout); } - if (sent) { - log.debug("[{}][{}][{}] RPC request is sent!", deviceId, rpcId, requestId); - } else { - log.debug("[{}][{}][{}] RPC request is NOT sent!", deviceId, rpcId, requestId); - } + String rpcSent = sent ? "" : "NOT "; + log.debug("[{}][{}][{}] RPC request is {}sent!", deviceId, rpcId, requestId, rpcSent); } private boolean isSendNewRpcAvailable() { - return !rpcSequential || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); + if (rpcSequential) { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_ACK_FROM_DEVICE)) { + return toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); + } + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + return toDeviceRpcPendingMap.values().stream().filter(ToDeviceRpcRequestMetadata::isDelivered).findAny().isEmpty(); + } + } + return true; } private void createRpc(ToDeviceRpcRequest request, RpcStatus status) { @@ -283,31 +296,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } void processRemoveRpc(RemoveRpcActorMsg msg) { - UUID requestId = msg.getRequestId(); - log.debug("[{}][{}] Received remove RPC request ...", deviceId, requestId); + UUID rpcId = msg.getRequestId(); + log.debug("[{}][{}] Received remove RPC request ...", deviceId, rpcId); Map.Entry entry = null; for (Map.Entry e : toDeviceRpcPendingMap.entrySet()) { - if (e.getValue().getMsg().getMsg().getId().equals(requestId)) { + if (e.getValue().getMsg().getMsg().getId().equals(rpcId)) { entry = e; break; } } - if (entry != null) { - Integer key = entry.getKey(); - if (entry.getValue().isDelivered()) { - toDeviceRpcPendingMap.remove(key); - } else { - Optional> firstRpc = getFirstRpc(); - if (firstRpc.isPresent() && key.equals(firstRpc.get().getKey())) { - toDeviceRpcPendingMap.remove(key); - log.debug("[{}][{}][{}] Removed pending RPC! Going to send next pending request ...", deviceId, requestId, key); - sendNextPendingRequest(); - } else { - toDeviceRpcPendingMap.remove(key); - } - } + if (entry == null) { + return; } + Integer requestId = entry.getKey(); + if (entry.getValue().isDelivered()) { + var md = toDeviceRpcPendingMap.remove(requestId); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(md); + sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); + } + return; + } + Optional> firstRpc = getFirstRpc(); + if (firstRpc.isPresent() && requestId.equals(firstRpc.get().getKey())) { + toDeviceRpcPendingMap.remove(requestId); + sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); + return; + } + toDeviceRpcPendingMap.remove(requestId); } private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { @@ -321,20 +338,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processServerSideRpcTimeout(DeviceActorServerSideRpcTimeoutMsg msg) { Integer requestId = msg.getId(); - ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(requestId); - if (requestMd != null) { - ToDeviceRpcRequest toDeviceRpcRequest = requestMd.getMsg().getMsg(); - UUID rpcId = toDeviceRpcRequest.getId(); - log.debug("[{}][{}][{}] RPC request timeout detected!", deviceId, rpcId, requestId); - if (toDeviceRpcRequest.isPersisted()) { - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.EXPIRED, null); - } - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, - null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); - if (!requestMd.isDelivered()) { - log.debug("[{}][{}][{}] Pending RPC timeout detected! Going to send next pending request ...", deviceId, rpcId, requestId); - sendNextPendingRequest(); - } + var requestMd = toDeviceRpcPendingMap.remove(requestId); + if (requestMd == null) { + return; + } + var toDeviceRpcRequest = requestMd.getMsg().getMsg(); + UUID rpcId = toDeviceRpcRequest.getId(); + log.debug("[{}][{}][{}] RPC request timeout detected!", deviceId, rpcId, requestId); + if (toDeviceRpcRequest.isPersisted()) { + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.EXPIRED, null); + } + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, + null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); + if (!requestMd.isDelivered()) { + sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); + return; + } + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(requestMd); + sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); } } @@ -363,10 +385,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private Optional> getFirstRpc() { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + return toDeviceRpcPendingMap.entrySet().stream() + .findFirst().filter(entry -> { + var md = entry.getValue(); + if (md.isDelivered()) { + if (md.getAwaitRpcResponseFuture() == null || md.getAwaitRpcResponseFuture().isCancelled()) { + var toDeviceRpcRequest = md.getMsg().getMsg(); + scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey()); + } + return false; + } + return true; + }); + } return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst(); } - private void sendNextPendingRequest() { + private void sendNextPendingRequest(UUID rpcId, int requestId, String logMessage) { + log.debug("[{}][{}][{}] {} Going to send next pending request ...", deviceId, rpcId, requestId, logMessage); if (rpcSequential) { rpcSubscriptions.forEach((id, s) -> sendPendingRequests(id, s.getNodeId())); } @@ -588,34 +625,38 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso log.debug("[{}][{}] Processing RPC command response: {}", deviceId, sessionId, responseMsg); int requestId = responseMsg.getRequestId(); ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(requestId); - boolean success = requestMd != null; - if (success) { - ToDeviceRpcRequest toDeviceRequestMsg = requestMd.getMsg().getMsg(); - boolean delivered = requestMd.isDelivered(); - boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); - try { - String payload = hasError ? responseMsg.getError() : responseMsg.getPayload(); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor( - new FromDeviceRpcResponse(toDeviceRequestMsg.getId(), payload, null)); - if (toDeviceRequestMsg.isPersisted()) { - RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL; - JsonNode response; - try { - response = JacksonUtil.toJsonNode(payload); - } catch (IllegalArgumentException e) { - response = JacksonUtil.newObjectNode().put("error", payload); - } - systemContext.getTbRpcService().save(tenantId, new RpcId(toDeviceRequestMsg.getId()), status, response); - } - } finally { - if (!delivered) { - String errorResponse = hasError ? "error" : ""; - log.debug("[{}][{}][{}] Received {} response for undelivered RPC! Going to send next pending request ...", deviceId, sessionId, requestId, errorResponse); - sendNextPendingRequest(); - } - } - } else { + if (requestMd == null) { log.debug("[{}][{}][{}] RPC command response is stale!", deviceId, sessionId, requestId); + return; + } + ToDeviceRpcRequest toDeviceRequestMsg = requestMd.getMsg().getMsg(); + UUID rpcId = toDeviceRequestMsg.getId(); + boolean delivered = requestMd.isDelivered(); + boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); + try { + String payload = hasError ? responseMsg.getError() : responseMsg.getPayload(); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor( + new FromDeviceRpcResponse(rpcId, payload, null)); + if (toDeviceRequestMsg.isPersisted()) { + RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL; + JsonNode response; + try { + response = JacksonUtil.toJsonNode(payload); + } catch (IllegalArgumentException e) { + response = JacksonUtil.newObjectNode().put("error", payload); + } + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); + } + } finally { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(requestMd); + String errorResponse = hasError ? "error " : ""; + String rpcState = delivered ? "" : "undelivered "; + sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState)); + } else if (!delivered) { + String errorResponse = hasError ? "error " : ""; + sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for undelivered RPC!", errorResponse)); + } } } @@ -626,39 +667,50 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso int requestId = responseMsg.getRequestId(); log.debug("[{}][{}][{}][{}] Processing RPC command response status: [{}]", deviceId, sessionId, rpcId, requestId, status); ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(requestId); - - if (md != null) { - JsonNode response = null; - if (status.equals(RpcStatus.DELIVERED)) { - if (md.getMsg().getMsg().isOneway()) { - toDeviceRpcPendingMap.remove(requestId); - if (rpcSequential) { - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null)); - } - } else { - md.setDelivered(true); - } - } else if (status.equals(RpcStatus.TIMEOUT)) { - Integer maxRpcRetries = md.getMsg().getMsg().getRetries(); - maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); - if (maxRpcRetries <= md.getRetries()) { - toDeviceRpcPendingMap.remove(requestId); - status = RpcStatus.FAILED; - response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: " + maxRpcRetries); - } else { - md.setRetries(md.getRetries() + 1); - } - } - - if (md.getMsg().getMsg().isPersisted()) { - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); - } - if (status != RpcStatus.SENT) { - log.debug("[{}][{}][{}][{}] RPC was {}! Going to send next pending request ...", deviceId, sessionId, rpcId, requestId, status.name().toLowerCase()); - sendNextPendingRequest(); - } - } else { + if (md == null) { log.warn("[{}][{}][{}][{}] RPC has already been removed from pending map.", deviceId, sessionId, rpcId, requestId); + return; + } + var toDeviceRpcRequest = md.getMsg().getMsg(); + boolean persisted = toDeviceRpcRequest.isPersisted(); + boolean oneWayRpc = toDeviceRpcRequest.isOneway(); + JsonNode response = null; + if (status.equals(RpcStatus.DELIVERED)) { + if (oneWayRpc) { + toDeviceRpcPendingMap.remove(requestId); + if (rpcSequential) { + var fromDeviceRpcResponse = new FromDeviceRpcResponse(rpcId, null, null); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(fromDeviceRpcResponse); + } + } else { + md.setDelivered(true); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + md.setAwaitRpcResponseFuture(scheduleAwaitRpcResponseFuture(rpcId, requestId)); + } + } + } else if (status.equals(RpcStatus.TIMEOUT)) { + Integer maxRpcRetries = toDeviceRpcRequest.getRetries(); + maxRpcRetries = maxRpcRetries == null ? + systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); + if (maxRpcRetries <= md.getRetries()) { + toDeviceRpcPendingMap.remove(requestId); + status = RpcStatus.FAILED; + response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " + + "attempts have been exhausted. Retry attempts set: " + maxRpcRetries); + } else { + md.setRetries(md.getRetries() + 1); + } + } + + if (persisted) { + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); + } + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) + && status.equals(RpcStatus.DELIVERED) && !oneWayRpc) { + return; + } + if (!status.equals(RpcStatus.SENT)) { + sendNextPendingRequest(rpcId, requestId, String.format("RPC was %s!", status.name().toLowerCase())); } } @@ -667,16 +719,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling attributes subscription for session: [{}]", deviceId, sessionId); attributeSubscriptions.remove(sessionId); - } else { - SessionInfoMetaData sessionMD = sessions.get(sessionId); - if (sessionMD == null) { - sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); - } - sessionMD.setSubscribedToAttributes(true); - log.debug("[{}] Registering attributes subscription for session: [{}]", deviceId, sessionId); - attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo()); - dumpSessions(); + return; } + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD == null) { + sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); + } + sessionMD.setSubscribedToAttributes(true); + log.debug("[{}] Registering attributes subscription for session: [{}]", deviceId, sessionId); + attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo()); + dumpSessions(); } private UUID getSessionId(SessionInfoProto sessionInfo) { @@ -688,17 +740,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); - } else { - SessionInfoMetaData sessionMD = sessions.get(sessionId); - if (sessionMD == null) { - sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); - } - sessionMD.setSubscribedToRPC(true); - rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); - log.debug("[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ...", deviceId, sessionId); - sendPendingRequests(sessionId, sessionInfo.getNodeId()); - dumpSessions(); + clearAwaitRpcResponseSchedulers(); + return; } + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD == null) { + sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); + } + sessionMD.setSubscribedToRPC(true); + rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); + log.debug("[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ...", deviceId, sessionId); + sendPendingRequests(sessionId, sessionInfo.getNodeId()); + dumpSessions(); } private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { @@ -722,6 +775,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso sessions.remove(sessionId); attributeSubscriptions.remove(sessionId); rpcSubscriptions.remove(sessionId); + clearAwaitRpcResponseSchedulers(); if (sessions.isEmpty()) { reportSessionClose(); } @@ -729,6 +783,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } + private ScheduledFuture scheduleAwaitRpcResponseFuture(UUID rpcId, int requestId) { + return scheduler.schedule(() -> { + var md = toDeviceRpcPendingMap.remove(requestId); + if (md == null) { + return; + } + sendNextPendingRequest(rpcId, requestId, "RPC was removed from pending map due to await timeout on response from device!"); + var toDeviceRpcRequest = md.getMsg().getMsg(); + if (toDeviceRpcRequest.isPersisted()) { + var responseAwaitTimeout = JacksonUtil.newObjectNode().put("error", "There was a timeout awaiting for RPC response from device."); + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.FAILED, responseAwaitTimeout); + } + }, systemContext.getRpcResponseTimeout(), TimeUnit.MILLISECONDS); + } + + private void clearAwaitRpcResponseSchedulers() { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + toDeviceRpcPendingMap.forEach((integer, md) -> clearAwaitRpcResponseScheduler(md)); + } + } + + private void clearAwaitRpcResponseScheduler(ToDeviceRpcRequestMetadata md) { + var awaitRpcResponseFuture = md.getAwaitRpcResponseFuture(); + if (awaitRpcResponseFuture == null) { + return; + } + awaitRpcResponseFuture.cancel(true); + } + private void handleSessionActivity(SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) { UUID sessionId = getSessionId(sessionInfoProto); Objects.requireNonNull(sessionId); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java index f876408d24..b4f6795af7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java @@ -18,6 +18,8 @@ package org.thingsboard.server.actors.device; import lombok.Data; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; +import java.util.concurrent.ScheduledFuture; + /** * @author Andrew Shvayka */ @@ -27,4 +29,5 @@ public class ToDeviceRpcRequestMetadata { private final boolean sent; private int retries; private boolean delivered; + private ScheduledFuture awaitRpcResponseFuture; } diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java b/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java index 464dc528a5..827788b34b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java @@ -230,7 +230,7 @@ public class RpcV2Controller extends AbstractRpcController { Rpc rpc = checkRpcId(rpcId, Operation.DELETE); if (rpc != null) { - if (rpc.getStatus().equals(RpcStatus.QUEUED)) { + if (rpc.getStatus().isPushDeleteNotificationToCore()) { RemoveRpcActorMsg removeMsg = new RemoveRpcActorMsg(getTenantId(), rpc.getDeviceId(), rpc.getUuidId()); log.trace("[{}] Forwarding msg {} to queue actor!", rpc.getDeviceId(), rpc); tbClusterService.pushMsgToCore(removeMsg, null); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/RpcSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/rpc/RpcSubmitStrategy.java new file mode 100644 index 0000000000..09da40d026 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/rpc/RpcSubmitStrategy.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import java.util.Arrays; + +public enum RpcSubmitStrategy { + + BURST, SEQUENTIAL_ON_ACK_FROM_DEVICE, SEQUENTIAL_ON_RESPONSE_FROM_DEVICE; + + public static RpcSubmitStrategy parse(String strategyStr) { + return Arrays.stream(RpcSubmitStrategy.values()) + .filter(strategy -> strategy.name().equalsIgnoreCase(strategyStr)) + .findFirst() + .orElse(BURST); + } +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c8dfd1b029..faeaade41b 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -398,8 +398,12 @@ actors: # Enqueue the result of external node processing as a separate message to the rule engine. force_ack: "${ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK:false}" rpc: + # Maximum number of persistent RPC call retries in case of failed requests delivery. max_retries: "${ACTORS_RPC_MAX_RETRIES:5}" - sequential: "${ACTORS_RPC_SEQUENTIAL:false}" + # RPC submit strategies. Allowed values: BURST, SEQUENTIAL_ON_ACK_FROM_DEVICE, SEQUENTIAL_ON_RESPONSE_FROM_DEVICE. + submit_strategy: "${ACTORS_RPC_SUBMIT_STRATEGY_TYPE:BURST}" + # Time in milliseconds for RPC to receive response after delivery. Used only for SEQUENTIAL_ON_RESPONSE_FROM_DEVICE submit strategy. + response_timeout_ms: "${ACTORS_RPC_RESPONSE_TIMEOUT_MS:30000}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 7f57fbbb3f..ec32ba47ef 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -65,6 +65,7 @@ import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.device.DeviceActor; import org.thingsboard.server.actors.device.DeviceActorMessageProcessor; import org.thingsboard.server.actors.device.SessionInfo; +import org.thingsboard.server.actors.device.ToDeviceRpcRequestMetadata; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; @@ -991,6 +992,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { }); } + protected void awaitForDeviceActorToProcessAllRpcResponses(DeviceId deviceId) { + DeviceActorMessageProcessor processor = getDeviceActorProcessor(deviceId); + Map toDeviceRpcPendingMap = (Map) ReflectionTestUtils.getField(processor, "toDeviceRpcPendingMap"); + Awaitility.await("Device actor pending map is empty").atMost(5, TimeUnit.SECONDS).until(() -> { + log.warn("device {}, toDeviceRpcPendingMap.size() == {}", deviceId, toDeviceRpcPendingMap.size()); + return toDeviceRpcPendingMap.isEmpty(); + }); + } + protected static String getMapName(FeatureType featureType) { switch (featureType) { case ATTRIBUTES: diff --git a/application/src/test/java/org/thingsboard/server/service/rpc/RpcSubmitStrategyTest.java b/application/src/test/java/org/thingsboard/server/service/rpc/RpcSubmitStrategyTest.java new file mode 100644 index 0000000000..5754181381 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/rpc/RpcSubmitStrategyTest.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import org.junit.jupiter.api.Test; +import org.thingsboard.server.common.data.StringUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +class RpcSubmitStrategyTest { + + @Test + void givenRandomString_whenParse_thenReturnBurstStrategy() { + String randomString = StringUtils.randomAlphanumeric(10); + RpcSubmitStrategy parsed = RpcSubmitStrategy.parse(randomString); + assertThat(parsed).isEqualTo(RpcSubmitStrategy.BURST); + } + + @Test + void givenNull_whenParse_thenReturnBurstStrategy() { + RpcSubmitStrategy parsed = RpcSubmitStrategy.parse(null); + assertThat(parsed).isEqualTo(RpcSubmitStrategy.BURST); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java index 55508d1b04..c23d1c615b 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java @@ -186,8 +186,12 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg } protected void subscribeAndWait(MqttTestClient client, String attrSubTopic, DeviceId deviceId, FeatureType featureType) throws MqttException { + subscribeAndWait(client, attrSubTopic, deviceId, featureType, MqttQoS.AT_MOST_ONCE); + } + + protected void subscribeAndWait(MqttTestClient client, String attrSubTopic, DeviceId deviceId, FeatureType featureType, MqttQoS mqttQoS) throws MqttException { int subscriptionCount = getDeviceActorSubscriptionCount(deviceId, featureType); - client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(attrSubTopic, mqttQoS); // TODO: This test awaits for the device actor to receive the subscription. Ideally it should not happen. See details below: // The transport layer acknowledge subscription request once the message about subscription is in the queue. // Test sends data immediately after acknowledgement. diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java index 208189ac4e..4b8646f8f5 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java @@ -30,7 +30,7 @@ public class MqttTestCallback implements MqttCallback { protected CountDownLatch subscribeLatch; protected final CountDownLatch deliveryLatch; - protected int qoS; + protected int messageArrivedQoS; protected byte[] payloadBytes; protected boolean pubAckReceived; @@ -53,7 +53,7 @@ public class MqttTestCallback implements MqttCallback { @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}", requestTopic); - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); subscribeLatch.countDown(); } @@ -63,6 +63,5 @@ public class MqttTestCallback implements MqttCallback { log.warn("delivery complete: {}", iMqttDeliveryToken.getResponse()); pubAckReceived = iMqttDeliveryToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK; deliveryLatch.countDown(); - } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java index 0f3cc14629..8227a3e596 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java @@ -36,7 +36,7 @@ public class MqttTestSubscribeOnTopicCallback extends MqttTestCallback { public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); subscribeLatch.countDown(); } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java index 71d8809e38..c184ccef7d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java @@ -575,14 +575,14 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes())); } protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId()); List expectedClientKeyValueProtos = expectedResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); @@ -606,7 +606,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS()); String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}"; assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes())); } @@ -614,7 +614,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS()); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName()); @@ -631,7 +631,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS()); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName()); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java index e1f20a3d95..c128ef2d26 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -37,12 +37,13 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; -import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import java.util.ArrayList; import java.util.List; @@ -61,7 +62,7 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEW @Slf4j public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractMqttIntegrationTest { - protected static final String RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" + + protected static final String RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" + "package rpc;\n" + "\n" + "message RpcRequestMsg {\n" + @@ -105,7 +106,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM } else { assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); } - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); client.disconnect(); } @@ -176,9 +177,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM return builder.build(); } - protected void processSequenceTwoWayRpcTest() throws Exception { - List expected = new ArrayList<>(); - List result = new ArrayList<>(); + protected void processSequenceOneWayRpcTest(MqttQoS mqttQoS) throws Exception { + List expectedRequest = new ArrayList<>(); + List actualRequests = new ArrayList<>(); String deviceId = savedDevice.getId().getId().toString(); @@ -186,20 +187,67 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ObjectNode request = JacksonUtil.newObjectNode(); request.put("method", "test"); request.put("params", i); - expected.add(JacksonUtil.toString(request)); + expectedRequest.add(JacksonUtil.toString(request)); request.put("persistent", true); - doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); + doPostAsync("/api/rpc/oneway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); } MqttTestClient client = new MqttTestClient(); client.connectAndWait(accessToken); - client.enableManualAcks(); - MqttTestSequenceCallback callback = new MqttTestSequenceCallback(client, 10, result); + MqttTestOneWaySequenceCallback callback = new MqttTestOneWaySequenceCallback(client, 10, actualRequests); client.setCallback(callback); - subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC); + subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC, mqttQoS); callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertEquals(expected, result); + assertEquals(expectedRequest, actualRequests); + client.disconnect(); + } + + protected void processSequenceTwoWayRpcTest(MqttQoS mqttQoS) throws Exception { + processSequenceTwoWayRpcTest(mqttQoS, false); + } + + protected void processSequenceTwoWayRpcTest(MqttQoS mqttQoS, boolean manualAcksEnabled) throws Exception { + List expectedRequest = new ArrayList<>(); + List actualRequests = new ArrayList<>(); + + List rpcIds = new ArrayList<>(); + + List expectedResponses = new ArrayList<>(); + List actualResponses = new ArrayList<>(); + + String deviceId = savedDevice.getId().getId().toString(); + + for (int i = 0; i < 10; i++) { + ObjectNode request = JacksonUtil.newObjectNode(); + request.put("method", "test"); + request.put("params", i); + expectedRequest.add(JacksonUtil.toString(request)); + request.put("persistent", true); + String response = doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); + var responseNode = JacksonUtil.toJsonNode(response); + rpcIds.add(responseNode.get("rpcId").asText()); + } + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + if (manualAcksEnabled) { + client.enableManualAcks(); + } + MqttTestTwoWaySequenceCallback callback = new MqttTestTwoWaySequenceCallback( + client, 10, actualRequests, expectedResponses, manualAcksEnabled); + client.setCallback(callback); + subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC, mqttQoS); + + callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertEquals(expectedRequest, actualRequests); + awaitForDeviceActorToProcessAllRpcResponses(savedDevice.getId()); + for (String rpcId : rpcIds) { + Rpc rpc = doGet("/api/rpc/persistent/" + rpcId, Rpc.class); + actualResponses.add(JacksonUtil.toString(rpc.getResponse())); + } + assertEquals(expectedResponses, actualResponses); + client.disconnect(); } protected void processJsonTwoWayRpcTestGateway(String deviceName) throws Exception { @@ -222,7 +270,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ); assertNotNull(savedDevice); - MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(GATEWAY_RPC_TOPIC); + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(GATEWAY_RPC_TOPIC); client.setCallback(callback); subscribeAndCheckSubscription(client, GATEWAY_RPC_TOPIC, savedDevice.getId(), FeatureType.RPC); @@ -248,7 +296,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM JsonNode expectedJsonRequestData = getExpectedGatewayJsonRequestData(deviceName, setGpioRequest); assertEquals(expectedJsonRequestData, JacksonUtil.fromBytes(callback.getPayloadBytes())); } - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } private JsonNode getExpectedGatewayJsonRequestData(String deviceName, String requestStr) { @@ -280,7 +328,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); log.warn("request payload: {}", JacksonUtil.fromBytes(callback.getPayloadBytes())); assertEquals("{\"success\":true}", actualRpcResponse); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } protected void validateProtoTwoWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { @@ -302,7 +350,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertEquals("{\"success\":true}", actualRpcResponse); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } private Device getDeviceByName(String deviceName) throws Exception { @@ -334,7 +382,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); String responseTopic; if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { @@ -366,7 +414,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); String responseTopic; if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { @@ -398,7 +446,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM try { DynamicMessage dynamicMessage = DynamicMessage.parseFrom(rpcRequestMsgDescriptor, requestPayload); List fields = rpcRequestMsgDescriptor.getFields(); - for (Descriptors.FieldDescriptor fieldDescriptor: fields) { + for (Descriptors.FieldDescriptor fieldDescriptor : fields) { assertTrue(dynamicMessage.hasField(fieldDescriptor)); } ProtoFileElement rpcResponseProtoFileElement = DynamicProtoUtils.getProtoFileElement(protoTransportPayloadConfiguration.getDeviceRpcResponseProtoSchema()); @@ -436,30 +484,69 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM return (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; } - protected class MqttTestSequenceCallback extends MqttTestCallback { + protected static class MqttTestOneWaySequenceCallback extends MqttTestCallback { - private final MqttTestClient client; - private final List expected; + private final List requests; - MqttTestSequenceCallback(MqttTestClient client, int subscribeCount, List expected) { + MqttTestOneWaySequenceCallback(MqttTestClient client, int subscribeCount, List requests) { super(subscribeCount); - this.client = client; - this.expected = expected; + this.requests = requests; } @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}", requestTopic); - expected.add(new String(mqttMessage.getPayload())); - String responseTopic = requestTopic.replace("request", "response"); - qoS = mqttMessage.getQos(); - try { - client.messageArrivedComplete(mqttMessage); - client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); - } catch (MqttException e) { - log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); - } + requests.add(new String(mqttMessage.getPayload())); + messageArrivedQoS = mqttMessage.getQos(); subscribeLatch.countDown(); } } + + protected class MqttTestTwoWaySequenceCallback extends MqttTestCallback { + + private final MqttTestClient client; + private final List requests; + private final List responses; + private final boolean manualAcksEnabled; + + MqttTestTwoWaySequenceCallback(MqttTestClient client, int subscribeCount, List requests, List responses, boolean manualAcksEnabled) { + super(subscribeCount); + this.client = client; + this.requests = requests; + this.responses = responses; + this.manualAcksEnabled = manualAcksEnabled; + } + + @Override + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}", requestTopic); + requests.add(new String(mqttMessage.getPayload())); + messageArrivedQoS = mqttMessage.getQos(); + if (manualAcksEnabled) { + try { + client.messageArrivedComplete(mqttMessage); + } catch (MqttException e) { + log.warn("Failed to ack message delivery on topic: {} due to: ", requestTopic, e); + } finally { + subscribeLatch.countDown(); + processResponse(requestTopic, mqttMessage); + } + return; + } + subscribeLatch.countDown(); + processResponse(requestTopic, mqttMessage); + } + + private void processResponse(String requestTopic, MqttMessage mqttMessage) { + String responseTopic = requestTopic.replace("request", "response"); + byte[] responsePayload = processJsonMessageArrived(requestTopic, mqttMessage); + responses.add(new String(responsePayload)); + try { + client.publish(responseTopic, responsePayload); + } catch (MqttException e) { + log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); + } + } + + } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java index 10a7a9619f..46f9cc7cff 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java @@ -110,11 +110,6 @@ public class MqttServerSideRpcDefaultIntegrationTest extends AbstractMqttServerS processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } - @Test - public void testSequenceServerMqttTwoWayRpc() throws Exception { - processSequenceTwoWayRpcTest(); - } - @Test public void testGatewayServerMqttOneWayRpc() throws Exception { processJsonOneWayRpcTestGateway("Gateway Device OneWay RPC"); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnAckIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnAckIntegrationTest.java new file mode 100644 index 0000000000..4476c6babb --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnAckIntegrationTest.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3.rpc; + +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; + +@Slf4j +@DaoSqlTest +@TestPropertySource(properties = { + "actors.rpc.submit_strategy=SEQUENTIAL_ON_ACK_FROM_DEVICE", +}) +public class MqttServerSideRpcSequenceOnAckIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { + + @Before + public void beforeTest() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("RPC test device") + .gatewayName("RPC test gateway") + .build(); + processBeforeTest(configProperties); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtMostOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtLeastOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnceWithManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE, true); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnceWithoutManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE, true); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnResponseIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnResponseIntegrationTest.java new file mode 100644 index 0000000000..7a081ed023 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnResponseIntegrationTest.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3.rpc; + +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; + +@Slf4j +@DaoSqlTest +@TestPropertySource(properties = { + "actors.rpc.submit_strategy=SEQUENTIAL_ON_RESPONSE_FROM_DEVICE", +}) +public class MqttServerSideRpcSequenceOnResponseIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { + + @Before + public void beforeTest() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("RPC test device") + .gatewayName("RPC test gateway") + .build(); + processBeforeTest(configProperties); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtMostOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtLeastOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnceWithManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE, true); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnceWithoutManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE, true); + } + + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index b29c6844a7..40b2b90856 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -332,7 +332,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertEquals(payload.getBytes(), callback.getPayloadBytes()); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } } diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index 99055e0e5f..b72c0c9800 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -13,7 +13,7 @@ transport.lwm2m.security.trust-credentials.keystore.store_file=lwm2m/credentials edges.enabled=false edges.storage.no_read_records_sleep=500 edges.storage.sleep_between_batches=500 -actors.rpc.sequential=true +actors.rpc.submit_strategy=BURST queue.rule-engine.stats.enabled=true # Transports disabled to speed up the context init. Particular transport will be enabled with @TestPropertySource in respective tests diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java index 58a7b3c707..be6d4767ec 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java @@ -15,6 +15,24 @@ */ package org.thingsboard.server.common.data.rpc; +import lombok.Getter; + public enum RpcStatus { - QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, EXPIRED, FAILED, DELETED + + QUEUED(true), + SENT(true), + DELIVERED(true), + SUCCESSFUL(false), + TIMEOUT(false), + EXPIRED(false), + FAILED(false), + DELETED(false); + + @Getter + private final boolean pushDeleteNotificationToCore; + + RpcStatus(boolean pushDeleteNotificationToCore) { + this.pushDeleteNotificationToCore = pushDeleteNotificationToCore; + } + } diff --git a/common/data/src/test/java/org/thingsboard/server/common/data/rpc/RpcStatusTest.java b/common/data/src/test/java/org/thingsboard/server/common/data/rpc/RpcStatusTest.java new file mode 100644 index 0000000000..dc3c30d28b --- /dev/null +++ b/common/data/src/test/java/org/thingsboard/server/common/data/rpc/RpcStatusTest.java @@ -0,0 +1,47 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.rpc; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.thingsboard.server.common.data.rpc.RpcStatus.DELIVERED; +import static org.thingsboard.server.common.data.rpc.RpcStatus.QUEUED; +import static org.thingsboard.server.common.data.rpc.RpcStatus.SENT; + +class RpcStatusTest { + + private static final List pushDeleteNotificationToCoreStatuses = List.of( + QUEUED, + SENT, + DELIVERED + ); + + @Test + void isPushDeleteNotificationToCoreStatusTest() { + var rpcStatuses = RpcStatus.values(); + for (var status : rpcStatuses) { + if (pushDeleteNotificationToCoreStatuses.contains(status)) { + assertThat(status.isPushDeleteNotificationToCore()).isTrue(); + } else { + assertThat(status.isPushDeleteNotificationToCore()).isFalse(); + } + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java index 808e7c2c83..ffacea8076 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java @@ -27,12 +27,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Component -public class DefaultSchedulerComponent implements SchedulerComponent{ +public class DefaultSchedulerComponent implements SchedulerComponent { protected ScheduledExecutorService schedulerExecutor; @PostConstruct - public void init(){ + public void init() { this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler")); } From d46bd36c87ca0e33c5fe973077b53f83b9e7d961 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Wed, 2 Aug 2023 16:10:19 +0300 Subject: [PATCH 2/5] fixed default timeout value in the ActorSystemContext --- .../java/org/thingsboard/server/actors/ActorSystemContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 1e223d19cf..d6f1385732 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -531,7 +531,7 @@ public class ActorSystemContext { @Getter private String rpcSubmitStrategy; - @Value("${actors.rpc.response_timeout_ms:60000}") + @Value("${actors.rpc.response_timeout_ms:30000}") @Getter private long rpcResponseTimeout; From ad912e1d28c4619b3bf342b8cc1d20748dc2bddf Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Thu, 3 Aug 2023 12:05:05 +0300 Subject: [PATCH 3/5] removed ScheduledFuture from ToDeviceRpcRequestMetadata and updated logic in DeviceActorMessageProcessor --- .../device/DeviceActorMessageProcessor.java | 33 ++++++++----------- .../device/ToDeviceRpcRequestMetadata.java | 3 -- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 615600a57f..7567130a7d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -136,6 +136,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private String deviceType; private TbMsgMetaData defaultMetaData; private EdgeId edgeId; + private ScheduledFuture awaitRpcResponseFuture; DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) { super(systemContext); @@ -311,9 +312,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } Integer requestId = entry.getKey(); if (entry.getValue().isDelivered()) { - var md = toDeviceRpcPendingMap.remove(requestId); + toDeviceRpcPendingMap.remove(requestId); if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(md); + clearAwaitRpcResponseScheduler(); sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); } return; @@ -355,7 +356,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso return; } if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(requestMd); + clearAwaitRpcResponseScheduler(); sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); } } @@ -390,9 +391,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso .findFirst().filter(entry -> { var md = entry.getValue(); if (md.isDelivered()) { - if (md.getAwaitRpcResponseFuture() == null || md.getAwaitRpcResponseFuture().isCancelled()) { + if (awaitRpcResponseFuture == null || awaitRpcResponseFuture.isCancelled()) { var toDeviceRpcRequest = md.getMsg().getMsg(); - scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey()); + awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey()); } return false; } @@ -649,7 +650,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } finally { if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(requestMd); + clearAwaitRpcResponseScheduler(); String errorResponse = hasError ? "error " : ""; String rpcState = delivered ? "" : "undelivered "; sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState)); @@ -685,7 +686,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } else { md.setDelivered(true); if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - md.setAwaitRpcResponseFuture(scheduleAwaitRpcResponseFuture(rpcId, requestId)); + awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(rpcId, requestId); } } } else if (status.equals(RpcStatus.TIMEOUT)) { @@ -740,7 +741,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); - clearAwaitRpcResponseSchedulers(); + clearAwaitRpcResponseScheduler(); return; } SessionInfoMetaData sessionMD = sessions.get(sessionId); @@ -775,7 +776,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso sessions.remove(sessionId); attributeSubscriptions.remove(sessionId); rpcSubscriptions.remove(sessionId); - clearAwaitRpcResponseSchedulers(); + clearAwaitRpcResponseScheduler(); if (sessions.isEmpty()) { reportSessionClose(); } @@ -798,20 +799,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso }, systemContext.getRpcResponseTimeout(), TimeUnit.MILLISECONDS); } - private void clearAwaitRpcResponseSchedulers() { - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - toDeviceRpcPendingMap.forEach((integer, md) -> clearAwaitRpcResponseScheduler(md)); + private void clearAwaitRpcResponseScheduler() { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) && awaitRpcResponseFuture != null) { + awaitRpcResponseFuture.cancel(true); } } - private void clearAwaitRpcResponseScheduler(ToDeviceRpcRequestMetadata md) { - var awaitRpcResponseFuture = md.getAwaitRpcResponseFuture(); - if (awaitRpcResponseFuture == null) { - return; - } - awaitRpcResponseFuture.cancel(true); - } - private void handleSessionActivity(SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) { UUID sessionId = getSessionId(sessionInfoProto); Objects.requireNonNull(sessionId); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java index b4f6795af7..f876408d24 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java @@ -18,8 +18,6 @@ package org.thingsboard.server.actors.device; import lombok.Data; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; -import java.util.concurrent.ScheduledFuture; - /** * @author Andrew Shvayka */ @@ -29,5 +27,4 @@ public class ToDeviceRpcRequestMetadata { private final boolean sent; private int retries; private boolean delivered; - private ScheduledFuture awaitRpcResponseFuture; } From d14ed53b6494223b849a1a235a103999cb0a8339 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Sat, 12 Aug 2023 10:14:03 +0300 Subject: [PATCH 4/5] rollback if-return refactoring to simplify review and save git history --- .../device/DeviceActorMessageProcessor.java | 247 +++++++++--------- 1 file changed, 123 insertions(+), 124 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 7567130a7d..349e0daeb4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -307,25 +307,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - if (entry == null) { - return; - } - Integer requestId = entry.getKey(); - if (entry.getValue().isDelivered()) { - toDeviceRpcPendingMap.remove(requestId); - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(); - sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); + if (entry != null) { + Integer requestId = entry.getKey(); + if (entry.getValue().isDelivered()) { + toDeviceRpcPendingMap.remove(requestId); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(); + sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); + } + } else { + Optional> firstRpc = getFirstRpc(); + if (firstRpc.isPresent() && requestId.equals(firstRpc.get().getKey())) { + toDeviceRpcPendingMap.remove(requestId); + sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); + } else { + toDeviceRpcPendingMap.remove(requestId); + } } - return; } - Optional> firstRpc = getFirstRpc(); - if (firstRpc.isPresent() && requestId.equals(firstRpc.get().getKey())) { - toDeviceRpcPendingMap.remove(requestId); - sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); - return; - } - toDeviceRpcPendingMap.remove(requestId); } private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { @@ -340,24 +339,23 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processServerSideRpcTimeout(DeviceActorServerSideRpcTimeoutMsg msg) { Integer requestId = msg.getId(); var requestMd = toDeviceRpcPendingMap.remove(requestId); - if (requestMd == null) { - return; - } - var toDeviceRpcRequest = requestMd.getMsg().getMsg(); - UUID rpcId = toDeviceRpcRequest.getId(); - log.debug("[{}][{}][{}] RPC request timeout detected!", deviceId, rpcId, requestId); - if (toDeviceRpcRequest.isPersisted()) { - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.EXPIRED, null); - } - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, - null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); - if (!requestMd.isDelivered()) { - sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); - return; - } - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(); - sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); + if (requestMd != null) { + var toDeviceRpcRequest = requestMd.getMsg().getMsg(); + UUID rpcId = toDeviceRpcRequest.getId(); + log.debug("[{}][{}][{}] RPC request timeout detected!", deviceId, rpcId, requestId); + if (toDeviceRpcRequest.isPersisted()) { + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.EXPIRED, null); + } + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, + null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); + if (!requestMd.isDelivered()) { + sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); + return; + } + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(); + sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); + } } } @@ -626,38 +624,39 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso log.debug("[{}][{}] Processing RPC command response: {}", deviceId, sessionId, responseMsg); int requestId = responseMsg.getRequestId(); ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(requestId); - if (requestMd == null) { - log.debug("[{}][{}][{}] RPC command response is stale!", deviceId, sessionId, requestId); - return; - } - ToDeviceRpcRequest toDeviceRequestMsg = requestMd.getMsg().getMsg(); - UUID rpcId = toDeviceRequestMsg.getId(); - boolean delivered = requestMd.isDelivered(); - boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); - try { - String payload = hasError ? responseMsg.getError() : responseMsg.getPayload(); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor( - new FromDeviceRpcResponse(rpcId, payload, null)); - if (toDeviceRequestMsg.isPersisted()) { - RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL; - JsonNode response; - try { - response = JacksonUtil.toJsonNode(payload); - } catch (IllegalArgumentException e) { - response = JacksonUtil.newObjectNode().put("error", payload); + boolean success = requestMd != null; + if (success) { + ToDeviceRpcRequest toDeviceRequestMsg = requestMd.getMsg().getMsg(); + UUID rpcId = toDeviceRequestMsg.getId(); + boolean delivered = requestMd.isDelivered(); + boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); + try { + String payload = hasError ? responseMsg.getError() : responseMsg.getPayload(); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor( + new FromDeviceRpcResponse(rpcId, payload, null)); + if (toDeviceRequestMsg.isPersisted()) { + RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL; + JsonNode response; + try { + response = JacksonUtil.toJsonNode(payload); + } catch (IllegalArgumentException e) { + response = JacksonUtil.newObjectNode().put("error", payload); + } + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); + } + } finally { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(); + String errorResponse = hasError ? "error " : ""; + String rpcState = delivered ? "" : "undelivered "; + sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState)); + } else if (!delivered) { + String errorResponse = hasError ? "error " : ""; + sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for undelivered RPC!", errorResponse)); } - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); - } - } finally { - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - clearAwaitRpcResponseScheduler(); - String errorResponse = hasError ? "error " : ""; - String rpcState = delivered ? "" : "undelivered "; - sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState)); - } else if (!delivered) { - String errorResponse = hasError ? "error " : ""; - sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for undelivered RPC!", errorResponse)); } + } else { + log.debug("[{}][{}][{}] RPC command response is stale!", deviceId, sessionId, requestId); } } @@ -668,50 +667,50 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso int requestId = responseMsg.getRequestId(); log.debug("[{}][{}][{}][{}] Processing RPC command response status: [{}]", deviceId, sessionId, rpcId, requestId, status); ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(requestId); - if (md == null) { - log.warn("[{}][{}][{}][{}] RPC has already been removed from pending map.", deviceId, sessionId, rpcId, requestId); - return; - } - var toDeviceRpcRequest = md.getMsg().getMsg(); - boolean persisted = toDeviceRpcRequest.isPersisted(); - boolean oneWayRpc = toDeviceRpcRequest.isOneway(); - JsonNode response = null; - if (status.equals(RpcStatus.DELIVERED)) { - if (oneWayRpc) { - toDeviceRpcPendingMap.remove(requestId); - if (rpcSequential) { - var fromDeviceRpcResponse = new FromDeviceRpcResponse(rpcId, null, null); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(fromDeviceRpcResponse); + if (md != null) { + var toDeviceRpcRequest = md.getMsg().getMsg(); + boolean persisted = toDeviceRpcRequest.isPersisted(); + boolean oneWayRpc = toDeviceRpcRequest.isOneway(); + JsonNode response = null; + if (status.equals(RpcStatus.DELIVERED)) { + if (oneWayRpc) { + toDeviceRpcPendingMap.remove(requestId); + if (rpcSequential) { + var fromDeviceRpcResponse = new FromDeviceRpcResponse(rpcId, null, null); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(fromDeviceRpcResponse); + } + } else { + md.setDelivered(true); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(rpcId, requestId); + } } - } else { - md.setDelivered(true); - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { - awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(rpcId, requestId); + } else if (status.equals(RpcStatus.TIMEOUT)) { + Integer maxRpcRetries = toDeviceRpcRequest.getRetries(); + maxRpcRetries = maxRpcRetries == null ? + systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); + if (maxRpcRetries <= md.getRetries()) { + toDeviceRpcPendingMap.remove(requestId); + status = RpcStatus.FAILED; + response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " + + "attempts have been exhausted. Retry attempts set: " + maxRpcRetries); + } else { + md.setRetries(md.getRetries() + 1); } } - } else if (status.equals(RpcStatus.TIMEOUT)) { - Integer maxRpcRetries = toDeviceRpcRequest.getRetries(); - maxRpcRetries = maxRpcRetries == null ? - systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); - if (maxRpcRetries <= md.getRetries()) { - toDeviceRpcPendingMap.remove(requestId); - status = RpcStatus.FAILED; - response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " + - "attempts have been exhausted. Retry attempts set: " + maxRpcRetries); - } else { - md.setRetries(md.getRetries() + 1); - } - } - if (persisted) { - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); - } - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) - && status.equals(RpcStatus.DELIVERED) && !oneWayRpc) { - return; - } - if (!status.equals(RpcStatus.SENT)) { - sendNextPendingRequest(rpcId, requestId, String.format("RPC was %s!", status.name().toLowerCase())); + if (persisted) { + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); + } + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) + && status.equals(RpcStatus.DELIVERED) && !oneWayRpc) { + return; + } + if (!status.equals(RpcStatus.SENT)) { + sendNextPendingRequest(rpcId, requestId, String.format("RPC was %s!", status.name().toLowerCase())); + } + } else { + log.warn("[{}][{}][{}][{}] RPC has already been removed from pending map.", deviceId, sessionId, rpcId, requestId); } } @@ -720,16 +719,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling attributes subscription for session: [{}]", deviceId, sessionId); attributeSubscriptions.remove(sessionId); - return; + } else { + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD == null) { + sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); + } + sessionMD.setSubscribedToAttributes(true); + log.debug("[{}] Registering attributes subscription for session: [{}]", deviceId, sessionId); + attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo()); + dumpSessions(); } - SessionInfoMetaData sessionMD = sessions.get(sessionId); - if (sessionMD == null) { - sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); - } - sessionMD.setSubscribedToAttributes(true); - log.debug("[{}] Registering attributes subscription for session: [{}]", deviceId, sessionId); - attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo()); - dumpSessions(); } private UUID getSessionId(SessionInfoProto sessionInfo) { @@ -742,17 +741,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); clearAwaitRpcResponseScheduler(); - return; + } else { + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD == null) { + sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); + } + sessionMD.setSubscribedToRPC(true); + rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); + log.debug("[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ...", deviceId, sessionId); + sendPendingRequests(sessionId, sessionInfo.getNodeId()); + dumpSessions(); } - SessionInfoMetaData sessionMD = sessions.get(sessionId); - if (sessionMD == null) { - sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId())); - } - sessionMD.setSubscribedToRPC(true); - rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); - log.debug("[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ...", deviceId, sessionId); - sendPendingRequests(sessionId, sessionInfo.getNodeId()); - dumpSessions(); } private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { From 69902f3cd632e2c54cce8c87bc5c5f6ad9f6cfaf Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Wed, 6 Sep 2023 14:39:54 +0300 Subject: [PATCH 5/5] refactoring after review --- .../device/DeviceActorMessageProcessor.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 349e0daeb4..b838f0b86d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -192,7 +192,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso ToDeviceRpcRequest request = msg.getMsg(); UUID rpcId = request.getId(); log.debug("[{}][{}] Received RPC request to process ...", deviceId, rpcId); - ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request); + ToDeviceRpcRequestMsg rpcRequest = createToDeviceRpcRequestMsg(request); long timeout = request.getExpirationTime() - System.currentTimeMillis(); boolean persisted = request.isPersisted(); @@ -243,20 +243,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } else { registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout); } - String rpcSent = sent ? "" : "NOT "; - log.debug("[{}][{}][{}] RPC request is {}sent!", deviceId, rpcId, requestId, rpcSent); + String rpcSent = sent ? "sent!" : "NOT sent!"; + log.debug("[{}][{}][{}] RPC request is {}", deviceId, rpcId, requestId, rpcSent); } private boolean isSendNewRpcAvailable() { - if (rpcSequential) { - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_ACK_FROM_DEVICE)) { + switch (rpcSubmitStrategy) { + case SEQUENTIAL_ON_ACK_FROM_DEVICE: return toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); - } - if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE: return toDeviceRpcPendingMap.values().stream().filter(ToDeviceRpcRequestMetadata::isDelivered).findAny().isEmpty(); - } + default: + return true; } - return true; } private void createRpc(ToDeviceRpcRequest request, RpcStatus status) { @@ -271,7 +270,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso systemContext.getTbRpcService().save(tenantId, rpc); } - private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { + private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { ToDeviceRpcRequestBody body = request.getBody(); return ToDeviceRpcRequestMsg.newBuilder() .setRequestId(rpcSeq++) @@ -647,12 +646,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } finally { if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { clearAwaitRpcResponseScheduler(); - String errorResponse = hasError ? "error " : ""; + String errorResponse = hasError ? "error response" : "response"; String rpcState = delivered ? "" : "undelivered "; - sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for %sRPC!", errorResponse, rpcState)); + sendNextPendingRequest(rpcId, requestId, String.format("Received %s for %sRPC!", errorResponse, rpcState)); } else if (!delivered) { - String errorResponse = hasError ? "error " : ""; - sendNextPendingRequest(rpcId, requestId, String.format("Received %sresponse for undelivered RPC!", errorResponse)); + String errorResponse = hasError ? "error response" : "response"; + sendNextPendingRequest(rpcId, requestId, String.format("Received %s for undelivered RPC!", errorResponse)); } } } else { @@ -1049,7 +1048,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso rpc.setStatus(RpcStatus.EXPIRED); systemContext.getTbRpcService().save(tenantId, rpc); } else { - registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout); + registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, createToDeviceRpcRequestMsg(msg), timeout); } }); if (pageData.hasNext()) {