diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index bade75df00..3d4340f6c9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -528,9 +528,13 @@ public class ActorSystemContext { @Getter private String debugPerTenantLimitsConfiguration; - @Value("${actors.rpc.sequential:false}") + @Value("${actors.rpc.submit_strategy:BURST}") @Getter - private boolean rpcSequential; + private String rpcSubmitStrategy; + + @Value("${actors.rpc.response_timeout_ms:30000}") + @Getter + private long rpcResponseTimeout; @Value("${actors.rpc.max_retries:5}") @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 0ce0ae7884..b838f0b86d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -89,6 +89,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; +import org.thingsboard.server.service.rpc.RpcSubmitStrategy; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -106,6 +107,9 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -124,22 +128,27 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private final Map rpcSubscriptions; private final Map toDeviceRpcPendingMap; private final boolean rpcSequential; + private final RpcSubmitStrategy rpcSubmitStrategy; + private final ScheduledExecutorService scheduler; private int rpcSeq = 0; private String deviceName; private String deviceType; private TbMsgMetaData defaultMetaData; private EdgeId edgeId; + private ScheduledFuture awaitRpcResponseFuture; DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) { super(systemContext); this.tenantId = tenantId; this.deviceId = deviceId; - this.rpcSequential = systemContext.isRpcSequential(); + this.rpcSubmitStrategy = RpcSubmitStrategy.parse(systemContext.getRpcSubmitStrategy()); + this.rpcSequential = !rpcSubmitStrategy.equals(RpcSubmitStrategy.BURST); this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); this.toDeviceRpcPendingMap = new LinkedHashMap<>(); this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit); + this.scheduler = systemContext.getScheduler(); if (initAttributes()) { restoreSessions(); } @@ -183,7 +192,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso ToDeviceRpcRequest request = msg.getMsg(); UUID rpcId = request.getId(); log.debug("[{}][{}] Received RPC request to process ...", deviceId, rpcId); - ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request); + ToDeviceRpcRequestMsg rpcRequest = createToDeviceRpcRequestMsg(request); long timeout = request.getExpirationTime() - System.currentTimeMillis(); boolean persisted = request.isPersisted(); @@ -225,24 +234,28 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (persisted) { ObjectNode response = JacksonUtil.newObjectNode(); response.put("rpcId", rpcId.toString()); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null)); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, JacksonUtil.toString(response), null)); } if (!persisted && request.isOneway() && sent) { log.debug("[{}] RPC command response sent [{}][{}]!", deviceId, rpcId, requestId); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null)); } else { registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout); } - if (sent) { - log.debug("[{}][{}][{}] RPC request is sent!", deviceId, rpcId, requestId); - } else { - log.debug("[{}][{}][{}] RPC request is NOT sent!", deviceId, rpcId, requestId); - } + String rpcSent = sent ? "sent!" : "NOT sent!"; + log.debug("[{}][{}][{}] RPC request is {}", deviceId, rpcId, requestId, rpcSent); } private boolean isSendNewRpcAvailable() { - return !rpcSequential || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); + switch (rpcSubmitStrategy) { + case SEQUENTIAL_ON_ACK_FROM_DEVICE: + return toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); + case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE: + return toDeviceRpcPendingMap.values().stream().filter(ToDeviceRpcRequestMetadata::isDelivered).findAny().isEmpty(); + default: + return true; + } } private void createRpc(ToDeviceRpcRequest request, RpcStatus status) { @@ -257,7 +270,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso systemContext.getTbRpcService().save(tenantId, rpc); } - private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { + private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { ToDeviceRpcRequestBody body = request.getBody(); return ToDeviceRpcRequestMsg.newBuilder() .setRequestId(rpcSeq++) @@ -283,28 +296,31 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } void processRemoveRpc(RemoveRpcActorMsg msg) { - UUID requestId = msg.getRequestId(); - log.debug("[{}][{}] Received remove RPC request ...", deviceId, requestId); + UUID rpcId = msg.getRequestId(); + log.debug("[{}][{}] Received remove RPC request ...", deviceId, rpcId); Map.Entry entry = null; for (Map.Entry e : toDeviceRpcPendingMap.entrySet()) { - if (e.getValue().getMsg().getMsg().getId().equals(requestId)) { + if (e.getValue().getMsg().getMsg().getId().equals(rpcId)) { entry = e; break; } } if (entry != null) { - Integer key = entry.getKey(); + Integer requestId = entry.getKey(); if (entry.getValue().isDelivered()) { - toDeviceRpcPendingMap.remove(key); + toDeviceRpcPendingMap.remove(requestId); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(); + sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); + } } else { Optional> firstRpc = getFirstRpc(); - if (firstRpc.isPresent() && key.equals(firstRpc.get().getKey())) { - toDeviceRpcPendingMap.remove(key); - log.debug("[{}][{}][{}] Removed pending RPC! Going to send next pending request ...", deviceId, requestId, key); - sendNextPendingRequest(); + if (firstRpc.isPresent() && requestId.equals(firstRpc.get().getKey())) { + toDeviceRpcPendingMap.remove(requestId); + sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!"); } else { - toDeviceRpcPendingMap.remove(key); + toDeviceRpcPendingMap.remove(requestId); } } } @@ -321,9 +337,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processServerSideRpcTimeout(DeviceActorServerSideRpcTimeoutMsg msg) { Integer requestId = msg.getId(); - ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(requestId); + var requestMd = toDeviceRpcPendingMap.remove(requestId); if (requestMd != null) { - ToDeviceRpcRequest toDeviceRpcRequest = requestMd.getMsg().getMsg(); + var toDeviceRpcRequest = requestMd.getMsg().getMsg(); UUID rpcId = toDeviceRpcRequest.getId(); log.debug("[{}][{}][{}] RPC request timeout detected!", deviceId, rpcId, requestId); if (toDeviceRpcRequest.isPersisted()) { @@ -332,8 +348,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); if (!requestMd.isDelivered()) { - log.debug("[{}][{}][{}] Pending RPC timeout detected! Going to send next pending request ...", deviceId, rpcId, requestId); - sendNextPendingRequest(); + sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); + return; + } + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(); + sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!"); } } } @@ -363,10 +383,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private Optional> getFirstRpc() { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + return toDeviceRpcPendingMap.entrySet().stream() + .findFirst().filter(entry -> { + var md = entry.getValue(); + if (md.isDelivered()) { + if (awaitRpcResponseFuture == null || awaitRpcResponseFuture.isCancelled()) { + var toDeviceRpcRequest = md.getMsg().getMsg(); + awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey()); + } + return false; + } + return true; + }); + } return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst(); } - private void sendNextPendingRequest() { + private void sendNextPendingRequest(UUID rpcId, int requestId, String logMessage) { + log.debug("[{}][{}][{}] {} Going to send next pending request ...", deviceId, rpcId, requestId, logMessage); if (rpcSequential) { rpcSubscriptions.forEach((id, s) -> sendPendingRequests(id, s.getNodeId())); } @@ -591,12 +626,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso boolean success = requestMd != null; if (success) { ToDeviceRpcRequest toDeviceRequestMsg = requestMd.getMsg().getMsg(); + UUID rpcId = toDeviceRequestMsg.getId(); boolean delivered = requestMd.isDelivered(); boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); try { String payload = hasError ? responseMsg.getError() : responseMsg.getPayload(); systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor( - new FromDeviceRpcResponse(toDeviceRequestMsg.getId(), payload, null)); + new FromDeviceRpcResponse(rpcId, payload, null)); if (toDeviceRequestMsg.isPersisted()) { RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL; JsonNode response; @@ -605,13 +641,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } catch (IllegalArgumentException e) { response = JacksonUtil.newObjectNode().put("error", payload); } - systemContext.getTbRpcService().save(tenantId, new RpcId(toDeviceRequestMsg.getId()), status, response); + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); } } finally { - if (!delivered) { - String errorResponse = hasError ? "error" : ""; - log.debug("[{}][{}][{}] Received {} response for undelivered RPC! Going to send next pending request ...", deviceId, sessionId, requestId, errorResponse); - sendNextPendingRequest(); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + clearAwaitRpcResponseScheduler(); + String errorResponse = hasError ? "error response" : "response"; + String rpcState = delivered ? "" : "undelivered "; + sendNextPendingRequest(rpcId, requestId, String.format("Received %s for %sRPC!", errorResponse, rpcState)); + } else if (!delivered) { + String errorResponse = hasError ? "error response" : "response"; + sendNextPendingRequest(rpcId, requestId, String.format("Received %s for undelivered RPC!", errorResponse)); } } } else { @@ -626,36 +666,47 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso int requestId = responseMsg.getRequestId(); log.debug("[{}][{}][{}][{}] Processing RPC command response status: [{}]", deviceId, sessionId, rpcId, requestId, status); ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(requestId); - if (md != null) { + var toDeviceRpcRequest = md.getMsg().getMsg(); + boolean persisted = toDeviceRpcRequest.isPersisted(); + boolean oneWayRpc = toDeviceRpcRequest.isOneway(); JsonNode response = null; if (status.equals(RpcStatus.DELIVERED)) { - if (md.getMsg().getMsg().isOneway()) { + if (oneWayRpc) { toDeviceRpcPendingMap.remove(requestId); if (rpcSequential) { - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null)); + var fromDeviceRpcResponse = new FromDeviceRpcResponse(rpcId, null, null); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(fromDeviceRpcResponse); } } else { md.setDelivered(true); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) { + awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(rpcId, requestId); + } } } else if (status.equals(RpcStatus.TIMEOUT)) { - Integer maxRpcRetries = md.getMsg().getMsg().getRetries(); - maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); + Integer maxRpcRetries = toDeviceRpcRequest.getRetries(); + maxRpcRetries = maxRpcRetries == null ? + systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); if (maxRpcRetries <= md.getRetries()) { toDeviceRpcPendingMap.remove(requestId); status = RpcStatus.FAILED; - response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: " + maxRpcRetries); + response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " + + "attempts have been exhausted. Retry attempts set: " + maxRpcRetries); } else { md.setRetries(md.getRetries() + 1); } } - if (md.getMsg().getMsg().isPersisted()) { + if (persisted) { systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response); } - if (status != RpcStatus.SENT) { - log.debug("[{}][{}][{}][{}] RPC was {}! Going to send next pending request ...", deviceId, sessionId, rpcId, requestId, status.name().toLowerCase()); - sendNextPendingRequest(); + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) + && status.equals(RpcStatus.DELIVERED) && !oneWayRpc) { + return; + } + if (!status.equals(RpcStatus.SENT)) { + sendNextPendingRequest(rpcId, requestId, String.format("RPC was %s!", status.name().toLowerCase())); } } else { log.warn("[{}][{}][{}][{}] RPC has already been removed from pending map.", deviceId, sessionId, rpcId, requestId); @@ -688,6 +739,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); + clearAwaitRpcResponseScheduler(); } else { SessionInfoMetaData sessionMD = sessions.get(sessionId); if (sessionMD == null) { @@ -722,6 +774,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso sessions.remove(sessionId); attributeSubscriptions.remove(sessionId); rpcSubscriptions.remove(sessionId); + clearAwaitRpcResponseScheduler(); if (sessions.isEmpty()) { reportSessionClose(); } @@ -729,6 +782,27 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } + private ScheduledFuture scheduleAwaitRpcResponseFuture(UUID rpcId, int requestId) { + return scheduler.schedule(() -> { + var md = toDeviceRpcPendingMap.remove(requestId); + if (md == null) { + return; + } + sendNextPendingRequest(rpcId, requestId, "RPC was removed from pending map due to await timeout on response from device!"); + var toDeviceRpcRequest = md.getMsg().getMsg(); + if (toDeviceRpcRequest.isPersisted()) { + var responseAwaitTimeout = JacksonUtil.newObjectNode().put("error", "There was a timeout awaiting for RPC response from device."); + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.FAILED, responseAwaitTimeout); + } + }, systemContext.getRpcResponseTimeout(), TimeUnit.MILLISECONDS); + } + + private void clearAwaitRpcResponseScheduler() { + if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) && awaitRpcResponseFuture != null) { + awaitRpcResponseFuture.cancel(true); + } + } + private void handleSessionActivity(SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) { UUID sessionId = getSessionId(sessionInfoProto); Objects.requireNonNull(sessionId); @@ -974,7 +1048,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso rpc.setStatus(RpcStatus.EXPIRED); systemContext.getTbRpcService().save(tenantId, rpc); } else { - registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout); + registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, createToDeviceRpcRequestMsg(msg), timeout); } }); if (pageData.hasNext()) { diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java b/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java index 451a5b7dc0..d8876eaa9c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java @@ -230,7 +230,7 @@ public class RpcV2Controller extends AbstractRpcController { Rpc rpc = checkRpcId(rpcId, Operation.DELETE); if (rpc != null) { - if (rpc.getStatus().equals(RpcStatus.QUEUED)) { + if (rpc.getStatus().isPushDeleteNotificationToCore()) { RemoveRpcActorMsg removeMsg = new RemoveRpcActorMsg(getTenantId(), rpc.getDeviceId(), rpc.getUuidId()); log.trace("[{}] Forwarding msg {} to queue actor!", rpc.getDeviceId(), rpc); tbClusterService.pushMsgToCore(removeMsg, null); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/RpcSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/rpc/RpcSubmitStrategy.java new file mode 100644 index 0000000000..09da40d026 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/rpc/RpcSubmitStrategy.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import java.util.Arrays; + +public enum RpcSubmitStrategy { + + BURST, SEQUENTIAL_ON_ACK_FROM_DEVICE, SEQUENTIAL_ON_RESPONSE_FROM_DEVICE; + + public static RpcSubmitStrategy parse(String strategyStr) { + return Arrays.stream(RpcSubmitStrategy.values()) + .filter(strategy -> strategy.name().equalsIgnoreCase(strategyStr)) + .findFirst() + .orElse(BURST); + } +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index bdfacbe884..6e745f063f 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -405,8 +405,12 @@ actors: # Enqueue the result of external node processing as a separate message to the rule engine. force_ack: "${ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK:false}" rpc: + # Maximum number of persistent RPC call retries in case of failed requests delivery. max_retries: "${ACTORS_RPC_MAX_RETRIES:5}" - sequential: "${ACTORS_RPC_SEQUENTIAL:false}" + # RPC submit strategies. Allowed values: BURST, SEQUENTIAL_ON_ACK_FROM_DEVICE, SEQUENTIAL_ON_RESPONSE_FROM_DEVICE. + submit_strategy: "${ACTORS_RPC_SUBMIT_STRATEGY_TYPE:BURST}" + # Time in milliseconds for RPC to receive response after delivery. Used only for SEQUENTIAL_ON_RESPONSE_FROM_DEVICE submit strategy. + response_timeout_ms: "${ACTORS_RPC_RESPONSE_TIMEOUT_MS:30000}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 888c9de367..151e779ae0 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -65,6 +65,7 @@ import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.device.DeviceActor; import org.thingsboard.server.actors.device.DeviceActorMessageProcessor; import org.thingsboard.server.actors.device.SessionInfo; +import org.thingsboard.server.actors.device.ToDeviceRpcRequestMetadata; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; @@ -1008,6 +1009,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { }); } + protected void awaitForDeviceActorToProcessAllRpcResponses(DeviceId deviceId) { + DeviceActorMessageProcessor processor = getDeviceActorProcessor(deviceId); + Map toDeviceRpcPendingMap = (Map) ReflectionTestUtils.getField(processor, "toDeviceRpcPendingMap"); + Awaitility.await("Device actor pending map is empty").atMost(5, TimeUnit.SECONDS).until(() -> { + log.warn("device {}, toDeviceRpcPendingMap.size() == {}", deviceId, toDeviceRpcPendingMap.size()); + return toDeviceRpcPendingMap.isEmpty(); + }); + } + protected static String getMapName(FeatureType featureType) { switch (featureType) { case ATTRIBUTES: diff --git a/application/src/test/java/org/thingsboard/server/service/rpc/RpcSubmitStrategyTest.java b/application/src/test/java/org/thingsboard/server/service/rpc/RpcSubmitStrategyTest.java new file mode 100644 index 0000000000..5754181381 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/rpc/RpcSubmitStrategyTest.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import org.junit.jupiter.api.Test; +import org.thingsboard.server.common.data.StringUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +class RpcSubmitStrategyTest { + + @Test + void givenRandomString_whenParse_thenReturnBurstStrategy() { + String randomString = StringUtils.randomAlphanumeric(10); + RpcSubmitStrategy parsed = RpcSubmitStrategy.parse(randomString); + assertThat(parsed).isEqualTo(RpcSubmitStrategy.BURST); + } + + @Test + void givenNull_whenParse_thenReturnBurstStrategy() { + RpcSubmitStrategy parsed = RpcSubmitStrategy.parse(null); + assertThat(parsed).isEqualTo(RpcSubmitStrategy.BURST); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java index 55508d1b04..c23d1c615b 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java @@ -186,8 +186,12 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg } protected void subscribeAndWait(MqttTestClient client, String attrSubTopic, DeviceId deviceId, FeatureType featureType) throws MqttException { + subscribeAndWait(client, attrSubTopic, deviceId, featureType, MqttQoS.AT_MOST_ONCE); + } + + protected void subscribeAndWait(MqttTestClient client, String attrSubTopic, DeviceId deviceId, FeatureType featureType, MqttQoS mqttQoS) throws MqttException { int subscriptionCount = getDeviceActorSubscriptionCount(deviceId, featureType); - client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(attrSubTopic, mqttQoS); // TODO: This test awaits for the device actor to receive the subscription. Ideally it should not happen. See details below: // The transport layer acknowledge subscription request once the message about subscription is in the queue. // Test sends data immediately after acknowledgement. diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java index 208189ac4e..4b8646f8f5 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestCallback.java @@ -30,7 +30,7 @@ public class MqttTestCallback implements MqttCallback { protected CountDownLatch subscribeLatch; protected final CountDownLatch deliveryLatch; - protected int qoS; + protected int messageArrivedQoS; protected byte[] payloadBytes; protected boolean pubAckReceived; @@ -53,7 +53,7 @@ public class MqttTestCallback implements MqttCallback { @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}", requestTopic); - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); subscribeLatch.countDown(); } @@ -63,6 +63,5 @@ public class MqttTestCallback implements MqttCallback { log.warn("delivery complete: {}", iMqttDeliveryToken.getResponse()); pubAckReceived = iMqttDeliveryToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK; deliveryLatch.countDown(); - } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java index 0f3cc14629..8227a3e596 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/MqttTestSubscribeOnTopicCallback.java @@ -36,7 +36,7 @@ public class MqttTestSubscribeOnTopicCallback extends MqttTestCallback { public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); subscribeLatch.countDown(); } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java index 71d8809e38..c184ccef7d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java @@ -575,14 +575,14 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes())); } protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId()); List expectedClientKeyValueProtos = expectedResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); @@ -606,7 +606,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS()); String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}"; assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes())); } @@ -614,7 +614,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS()); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName()); @@ -631,7 +631,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); - assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS()); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName()); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java index e1f20a3d95..c128ef2d26 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -37,12 +37,13 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; -import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; import java.util.ArrayList; import java.util.List; @@ -61,7 +62,7 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEW @Slf4j public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractMqttIntegrationTest { - protected static final String RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" + + protected static final String RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" + "package rpc;\n" + "\n" + "message RpcRequestMsg {\n" + @@ -105,7 +106,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM } else { assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes())); } - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); client.disconnect(); } @@ -176,9 +177,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM return builder.build(); } - protected void processSequenceTwoWayRpcTest() throws Exception { - List expected = new ArrayList<>(); - List result = new ArrayList<>(); + protected void processSequenceOneWayRpcTest(MqttQoS mqttQoS) throws Exception { + List expectedRequest = new ArrayList<>(); + List actualRequests = new ArrayList<>(); String deviceId = savedDevice.getId().getId().toString(); @@ -186,20 +187,67 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ObjectNode request = JacksonUtil.newObjectNode(); request.put("method", "test"); request.put("params", i); - expected.add(JacksonUtil.toString(request)); + expectedRequest.add(JacksonUtil.toString(request)); request.put("persistent", true); - doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); + doPostAsync("/api/rpc/oneway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); } MqttTestClient client = new MqttTestClient(); client.connectAndWait(accessToken); - client.enableManualAcks(); - MqttTestSequenceCallback callback = new MqttTestSequenceCallback(client, 10, result); + MqttTestOneWaySequenceCallback callback = new MqttTestOneWaySequenceCallback(client, 10, actualRequests); client.setCallback(callback); - subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC); + subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC, mqttQoS); callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertEquals(expected, result); + assertEquals(expectedRequest, actualRequests); + client.disconnect(); + } + + protected void processSequenceTwoWayRpcTest(MqttQoS mqttQoS) throws Exception { + processSequenceTwoWayRpcTest(mqttQoS, false); + } + + protected void processSequenceTwoWayRpcTest(MqttQoS mqttQoS, boolean manualAcksEnabled) throws Exception { + List expectedRequest = new ArrayList<>(); + List actualRequests = new ArrayList<>(); + + List rpcIds = new ArrayList<>(); + + List expectedResponses = new ArrayList<>(); + List actualResponses = new ArrayList<>(); + + String deviceId = savedDevice.getId().getId().toString(); + + for (int i = 0; i < 10; i++) { + ObjectNode request = JacksonUtil.newObjectNode(); + request.put("method", "test"); + request.put("params", i); + expectedRequest.add(JacksonUtil.toString(request)); + request.put("persistent", true); + String response = doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk()); + var responseNode = JacksonUtil.toJsonNode(response); + rpcIds.add(responseNode.get("rpcId").asText()); + } + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + if (manualAcksEnabled) { + client.enableManualAcks(); + } + MqttTestTwoWaySequenceCallback callback = new MqttTestTwoWaySequenceCallback( + client, 10, actualRequests, expectedResponses, manualAcksEnabled); + client.setCallback(callback); + subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC, mqttQoS); + + callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertEquals(expectedRequest, actualRequests); + awaitForDeviceActorToProcessAllRpcResponses(savedDevice.getId()); + for (String rpcId : rpcIds) { + Rpc rpc = doGet("/api/rpc/persistent/" + rpcId, Rpc.class); + actualResponses.add(JacksonUtil.toString(rpc.getResponse())); + } + assertEquals(expectedResponses, actualResponses); + client.disconnect(); } protected void processJsonTwoWayRpcTestGateway(String deviceName) throws Exception { @@ -222,7 +270,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM ); assertNotNull(savedDevice); - MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(GATEWAY_RPC_TOPIC); + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(GATEWAY_RPC_TOPIC); client.setCallback(callback); subscribeAndCheckSubscription(client, GATEWAY_RPC_TOPIC, savedDevice.getId(), FeatureType.RPC); @@ -248,7 +296,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM JsonNode expectedJsonRequestData = getExpectedGatewayJsonRequestData(deviceName, setGpioRequest); assertEquals(expectedJsonRequestData, JacksonUtil.fromBytes(callback.getPayloadBytes())); } - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } private JsonNode getExpectedGatewayJsonRequestData(String deviceName, String requestStr) { @@ -280,7 +328,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); log.warn("request payload: {}", JacksonUtil.fromBytes(callback.getPayloadBytes())); assertEquals("{\"success\":true}", actualRpcResponse); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } protected void validateProtoTwoWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception { @@ -302,7 +350,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertEquals("{\"success\":true}", actualRpcResponse); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } private Device getDeviceByName(String deviceName) throws Exception { @@ -334,7 +382,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); String responseTopic; if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { @@ -366,7 +414,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); + messageArrivedQoS = mqttMessage.getQos(); payloadBytes = mqttMessage.getPayload(); String responseTopic; if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) { @@ -398,7 +446,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM try { DynamicMessage dynamicMessage = DynamicMessage.parseFrom(rpcRequestMsgDescriptor, requestPayload); List fields = rpcRequestMsgDescriptor.getFields(); - for (Descriptors.FieldDescriptor fieldDescriptor: fields) { + for (Descriptors.FieldDescriptor fieldDescriptor : fields) { assertTrue(dynamicMessage.hasField(fieldDescriptor)); } ProtoFileElement rpcResponseProtoFileElement = DynamicProtoUtils.getProtoFileElement(protoTransportPayloadConfiguration.getDeviceRpcResponseProtoSchema()); @@ -436,30 +484,69 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM return (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration; } - protected class MqttTestSequenceCallback extends MqttTestCallback { + protected static class MqttTestOneWaySequenceCallback extends MqttTestCallback { - private final MqttTestClient client; - private final List expected; + private final List requests; - MqttTestSequenceCallback(MqttTestClient client, int subscribeCount, List expected) { + MqttTestOneWaySequenceCallback(MqttTestClient client, int subscribeCount, List requests) { super(subscribeCount); - this.client = client; - this.expected = expected; + this.requests = requests; } @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) { log.warn("messageArrived on topic: {}", requestTopic); - expected.add(new String(mqttMessage.getPayload())); - String responseTopic = requestTopic.replace("request", "response"); - qoS = mqttMessage.getQos(); - try { - client.messageArrivedComplete(mqttMessage); - client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage)); - } catch (MqttException e) { - log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); - } + requests.add(new String(mqttMessage.getPayload())); + messageArrivedQoS = mqttMessage.getQos(); subscribeLatch.countDown(); } } + + protected class MqttTestTwoWaySequenceCallback extends MqttTestCallback { + + private final MqttTestClient client; + private final List requests; + private final List responses; + private final boolean manualAcksEnabled; + + MqttTestTwoWaySequenceCallback(MqttTestClient client, int subscribeCount, List requests, List responses, boolean manualAcksEnabled) { + super(subscribeCount); + this.client = client; + this.requests = requests; + this.responses = responses; + this.manualAcksEnabled = manualAcksEnabled; + } + + @Override + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}", requestTopic); + requests.add(new String(mqttMessage.getPayload())); + messageArrivedQoS = mqttMessage.getQos(); + if (manualAcksEnabled) { + try { + client.messageArrivedComplete(mqttMessage); + } catch (MqttException e) { + log.warn("Failed to ack message delivery on topic: {} due to: ", requestTopic, e); + } finally { + subscribeLatch.countDown(); + processResponse(requestTopic, mqttMessage); + } + return; + } + subscribeLatch.countDown(); + processResponse(requestTopic, mqttMessage); + } + + private void processResponse(String requestTopic, MqttMessage mqttMessage) { + String responseTopic = requestTopic.replace("request", "response"); + byte[] responsePayload = processJsonMessageArrived(requestTopic, mqttMessage); + responses.add(new String(responsePayload)); + try { + client.publish(responseTopic, responsePayload); + } catch (MqttException e) { + log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); + } + } + + } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java index 10a7a9619f..46f9cc7cff 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcDefaultIntegrationTest.java @@ -110,11 +110,6 @@ public class MqttServerSideRpcDefaultIntegrationTest extends AbstractMqttServerS processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC); } - @Test - public void testSequenceServerMqttTwoWayRpc() throws Exception { - processSequenceTwoWayRpcTest(); - } - @Test public void testGatewayServerMqttOneWayRpc() throws Exception { processJsonOneWayRpcTestGateway("Gateway Device OneWay RPC"); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnAckIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnAckIntegrationTest.java new file mode 100644 index 0000000000..4476c6babb --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnAckIntegrationTest.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3.rpc; + +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; + +@Slf4j +@DaoSqlTest +@TestPropertySource(properties = { + "actors.rpc.submit_strategy=SEQUENTIAL_ON_ACK_FROM_DEVICE", +}) +public class MqttServerSideRpcSequenceOnAckIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { + + @Before + public void beforeTest() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("RPC test device") + .gatewayName("RPC test gateway") + .build(); + processBeforeTest(configProperties); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtMostOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtLeastOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnceWithManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE, true); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnceWithoutManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE, true); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnResponseIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnResponseIntegrationTest.java new file mode 100644 index 0000000000..7a081ed023 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttServerSideRpcSequenceOnResponseIntegrationTest.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3.rpc; + +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; + +@Slf4j +@DaoSqlTest +@TestPropertySource(properties = { + "actors.rpc.submit_strategy=SEQUENTIAL_ON_RESPONSE_FROM_DEVICE", +}) +public class MqttServerSideRpcSequenceOnResponseIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest { + + @Before + public void beforeTest() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("RPC test device") + .gatewayName("RPC test gateway") + .build(); + processBeforeTest(configProperties); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtMostOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttOneWayRpcQoSAtLeastOnce() throws Exception { + processSequenceOneWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnce() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtMostOnceWithManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE, true); + } + + @Test + public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnceWithoutManualAcksEnabled() throws Exception { + processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE, true); + } + + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index b29c6844a7..40b2b90856 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -332,7 +332,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertEquals(payload.getBytes(), callback.getPayloadBytes()); - assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS()); } } diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index d96af8a910..a964b0b700 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -13,7 +13,7 @@ transport.lwm2m.security.trust-credentials.keystore.store_file=lwm2m/credentials edges.enabled=false edges.storage.no_read_records_sleep=500 edges.storage.sleep_between_batches=500 -actors.rpc.sequential=true +actors.rpc.submit_strategy=BURST queue.rule-engine.stats.enabled=true # Transports disabled to speed up the context init. Particular transport will be enabled with @TestPropertySource in respective tests diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java index 58a7b3c707..be6d4767ec 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java @@ -15,6 +15,24 @@ */ package org.thingsboard.server.common.data.rpc; +import lombok.Getter; + public enum RpcStatus { - QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, EXPIRED, FAILED, DELETED + + QUEUED(true), + SENT(true), + DELIVERED(true), + SUCCESSFUL(false), + TIMEOUT(false), + EXPIRED(false), + FAILED(false), + DELETED(false); + + @Getter + private final boolean pushDeleteNotificationToCore; + + RpcStatus(boolean pushDeleteNotificationToCore) { + this.pushDeleteNotificationToCore = pushDeleteNotificationToCore; + } + } diff --git a/common/data/src/test/java/org/thingsboard/server/common/data/rpc/RpcStatusTest.java b/common/data/src/test/java/org/thingsboard/server/common/data/rpc/RpcStatusTest.java new file mode 100644 index 0000000000..dc3c30d28b --- /dev/null +++ b/common/data/src/test/java/org/thingsboard/server/common/data/rpc/RpcStatusTest.java @@ -0,0 +1,47 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.rpc; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.thingsboard.server.common.data.rpc.RpcStatus.DELIVERED; +import static org.thingsboard.server.common.data.rpc.RpcStatus.QUEUED; +import static org.thingsboard.server.common.data.rpc.RpcStatus.SENT; + +class RpcStatusTest { + + private static final List pushDeleteNotificationToCoreStatuses = List.of( + QUEUED, + SENT, + DELIVERED + ); + + @Test + void isPushDeleteNotificationToCoreStatusTest() { + var rpcStatuses = RpcStatus.values(); + for (var status : rpcStatuses) { + if (pushDeleteNotificationToCoreStatuses.contains(status)) { + assertThat(status.isPushDeleteNotificationToCore()).isTrue(); + } else { + assertThat(status.isPushDeleteNotificationToCore()).isFalse(); + } + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java index 808e7c2c83..ffacea8076 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java @@ -27,12 +27,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Component -public class DefaultSchedulerComponent implements SchedulerComponent{ +public class DefaultSchedulerComponent implements SchedulerComponent { protected ScheduledExecutorService schedulerExecutor; @PostConstruct - public void init(){ + public void init() { this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler")); }