Merge pull request #9027 from ShvaykaD/feature/rpc-sequential-strategies

Rpc sequential strategies
This commit is contained in:
Andrew Shvayka 2023-09-06 15:47:38 +03:00 committed by GitHub
commit ba48ed192f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 556 additions and 101 deletions

View File

@ -528,9 +528,13 @@ public class ActorSystemContext {
@Getter
private String debugPerTenantLimitsConfiguration;
@Value("${actors.rpc.sequential:false}")
@Value("${actors.rpc.submit_strategy:BURST}")
@Getter
private boolean rpcSequential;
private String rpcSubmitStrategy;
@Value("${actors.rpc.response_timeout_ms:30000}")
@Getter
private long rpcResponseTimeout;
@Value("${actors.rpc.max_retries:5}")
@Getter

View File

@ -89,6 +89,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
import org.thingsboard.server.service.rpc.RemoveRpcActorMsg;
import org.thingsboard.server.service.rpc.RpcSubmitStrategy;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
@ -106,6 +107,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -124,22 +128,27 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private final Map<UUID, SessionInfo> rpcSubscriptions;
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
private final boolean rpcSequential;
private final RpcSubmitStrategy rpcSubmitStrategy;
private final ScheduledExecutorService scheduler;
private int rpcSeq = 0;
private String deviceName;
private String deviceType;
private TbMsgMetaData defaultMetaData;
private EdgeId edgeId;
private ScheduledFuture<?> awaitRpcResponseFuture;
DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
super(systemContext);
this.tenantId = tenantId;
this.deviceId = deviceId;
this.rpcSequential = systemContext.isRpcSequential();
this.rpcSubmitStrategy = RpcSubmitStrategy.parse(systemContext.getRpcSubmitStrategy());
this.rpcSequential = !rpcSubmitStrategy.equals(RpcSubmitStrategy.BURST);
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.toDeviceRpcPendingMap = new LinkedHashMap<>();
this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit);
this.scheduler = systemContext.getScheduler();
if (initAttributes()) {
restoreSessions();
}
@ -183,7 +192,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
ToDeviceRpcRequest request = msg.getMsg();
UUID rpcId = request.getId();
log.debug("[{}][{}] Received RPC request to process ...", deviceId, rpcId);
ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
ToDeviceRpcRequestMsg rpcRequest = createToDeviceRpcRequestMsg(request);
long timeout = request.getExpirationTime() - System.currentTimeMillis();
boolean persisted = request.isPersisted();
@ -225,24 +234,28 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (persisted) {
ObjectNode response = JacksonUtil.newObjectNode();
response.put("rpcId", rpcId.toString());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, JacksonUtil.toString(response), null));
}
if (!persisted && request.isOneway() && sent) {
log.debug("[{}] RPC command response sent [{}][{}]!", deviceId, rpcId, requestId);
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
} else {
registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
}
if (sent) {
log.debug("[{}][{}][{}] RPC request is sent!", deviceId, rpcId, requestId);
} else {
log.debug("[{}][{}][{}] RPC request is NOT sent!", deviceId, rpcId, requestId);
}
String rpcSent = sent ? "sent!" : "NOT sent!";
log.debug("[{}][{}][{}] RPC request is {}", deviceId, rpcId, requestId, rpcSent);
}
private boolean isSendNewRpcAvailable() {
return !rpcSequential || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
switch (rpcSubmitStrategy) {
case SEQUENTIAL_ON_ACK_FROM_DEVICE:
return toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE:
return toDeviceRpcPendingMap.values().stream().filter(ToDeviceRpcRequestMetadata::isDelivered).findAny().isEmpty();
default:
return true;
}
}
private void createRpc(ToDeviceRpcRequest request, RpcStatus status) {
@ -257,7 +270,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext.getTbRpcService().save(tenantId, rpc);
}
private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
ToDeviceRpcRequestBody body = request.getBody();
return ToDeviceRpcRequestMsg.newBuilder()
.setRequestId(rpcSeq++)
@ -283,28 +296,31 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
void processRemoveRpc(RemoveRpcActorMsg msg) {
UUID requestId = msg.getRequestId();
log.debug("[{}][{}] Received remove RPC request ...", deviceId, requestId);
UUID rpcId = msg.getRequestId();
log.debug("[{}][{}] Received remove RPC request ...", deviceId, rpcId);
Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry = null;
for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> e : toDeviceRpcPendingMap.entrySet()) {
if (e.getValue().getMsg().getMsg().getId().equals(requestId)) {
if (e.getValue().getMsg().getMsg().getId().equals(rpcId)) {
entry = e;
break;
}
}
if (entry != null) {
Integer key = entry.getKey();
Integer requestId = entry.getKey();
if (entry.getValue().isDelivered()) {
toDeviceRpcPendingMap.remove(key);
toDeviceRpcPendingMap.remove(requestId);
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
clearAwaitRpcResponseScheduler();
sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!");
}
} else {
Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> firstRpc = getFirstRpc();
if (firstRpc.isPresent() && key.equals(firstRpc.get().getKey())) {
toDeviceRpcPendingMap.remove(key);
log.debug("[{}][{}][{}] Removed pending RPC! Going to send next pending request ...", deviceId, requestId, key);
sendNextPendingRequest();
if (firstRpc.isPresent() && requestId.equals(firstRpc.get().getKey())) {
toDeviceRpcPendingMap.remove(requestId);
sendNextPendingRequest(rpcId, requestId, "Removed pending RPC!");
} else {
toDeviceRpcPendingMap.remove(key);
toDeviceRpcPendingMap.remove(requestId);
}
}
}
@ -321,9 +337,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processServerSideRpcTimeout(DeviceActorServerSideRpcTimeoutMsg msg) {
Integer requestId = msg.getId();
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(requestId);
var requestMd = toDeviceRpcPendingMap.remove(requestId);
if (requestMd != null) {
ToDeviceRpcRequest toDeviceRpcRequest = requestMd.getMsg().getMsg();
var toDeviceRpcRequest = requestMd.getMsg().getMsg();
UUID rpcId = toDeviceRpcRequest.getId();
log.debug("[{}][{}][{}] RPC request timeout detected!", deviceId, rpcId, requestId);
if (toDeviceRpcRequest.isPersisted()) {
@ -332,8 +348,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId,
null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
if (!requestMd.isDelivered()) {
log.debug("[{}][{}][{}] Pending RPC timeout detected! Going to send next pending request ...", deviceId, rpcId, requestId);
sendNextPendingRequest();
sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!");
return;
}
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
clearAwaitRpcResponseScheduler();
sendNextPendingRequest(rpcId, requestId, "Pending RPC timeout detected!");
}
}
}
@ -363,10 +383,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> getFirstRpc() {
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
return toDeviceRpcPendingMap.entrySet().stream()
.findFirst().filter(entry -> {
var md = entry.getValue();
if (md.isDelivered()) {
if (awaitRpcResponseFuture == null || awaitRpcResponseFuture.isCancelled()) {
var toDeviceRpcRequest = md.getMsg().getMsg();
awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(toDeviceRpcRequest.getId(), entry.getKey());
}
return false;
}
return true;
});
}
return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst();
}
private void sendNextPendingRequest() {
private void sendNextPendingRequest(UUID rpcId, int requestId, String logMessage) {
log.debug("[{}][{}][{}] {} Going to send next pending request ...", deviceId, rpcId, requestId, logMessage);
if (rpcSequential) {
rpcSubscriptions.forEach((id, s) -> sendPendingRequests(id, s.getNodeId()));
}
@ -591,12 +626,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
boolean success = requestMd != null;
if (success) {
ToDeviceRpcRequest toDeviceRequestMsg = requestMd.getMsg().getMsg();
UUID rpcId = toDeviceRequestMsg.getId();
boolean delivered = requestMd.isDelivered();
boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
try {
String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
new FromDeviceRpcResponse(toDeviceRequestMsg.getId(), payload, null));
new FromDeviceRpcResponse(rpcId, payload, null));
if (toDeviceRequestMsg.isPersisted()) {
RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
JsonNode response;
@ -605,13 +641,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
} catch (IllegalArgumentException e) {
response = JacksonUtil.newObjectNode().put("error", payload);
}
systemContext.getTbRpcService().save(tenantId, new RpcId(toDeviceRequestMsg.getId()), status, response);
systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response);
}
} finally {
if (!delivered) {
String errorResponse = hasError ? "error" : "";
log.debug("[{}][{}][{}] Received {} response for undelivered RPC! Going to send next pending request ...", deviceId, sessionId, requestId, errorResponse);
sendNextPendingRequest();
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
clearAwaitRpcResponseScheduler();
String errorResponse = hasError ? "error response" : "response";
String rpcState = delivered ? "" : "undelivered ";
sendNextPendingRequest(rpcId, requestId, String.format("Received %s for %sRPC!", errorResponse, rpcState));
} else if (!delivered) {
String errorResponse = hasError ? "error response" : "response";
sendNextPendingRequest(rpcId, requestId, String.format("Received %s for undelivered RPC!", errorResponse));
}
}
} else {
@ -626,36 +666,47 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
int requestId = responseMsg.getRequestId();
log.debug("[{}][{}][{}][{}] Processing RPC command response status: [{}]", deviceId, sessionId, rpcId, requestId, status);
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(requestId);
if (md != null) {
var toDeviceRpcRequest = md.getMsg().getMsg();
boolean persisted = toDeviceRpcRequest.isPersisted();
boolean oneWayRpc = toDeviceRpcRequest.isOneway();
JsonNode response = null;
if (status.equals(RpcStatus.DELIVERED)) {
if (md.getMsg().getMsg().isOneway()) {
if (oneWayRpc) {
toDeviceRpcPendingMap.remove(requestId);
if (rpcSequential) {
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
var fromDeviceRpcResponse = new FromDeviceRpcResponse(rpcId, null, null);
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(fromDeviceRpcResponse);
}
} else {
md.setDelivered(true);
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)) {
awaitRpcResponseFuture = scheduleAwaitRpcResponseFuture(rpcId, requestId);
}
}
} else if (status.equals(RpcStatus.TIMEOUT)) {
Integer maxRpcRetries = md.getMsg().getMsg().getRetries();
maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
Integer maxRpcRetries = toDeviceRpcRequest.getRetries();
maxRpcRetries = maxRpcRetries == null ?
systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
if (maxRpcRetries <= md.getRetries()) {
toDeviceRpcPendingMap.remove(requestId);
status = RpcStatus.FAILED;
response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: " + maxRpcRetries);
response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " +
"attempts have been exhausted. Retry attempts set: " + maxRpcRetries);
} else {
md.setRetries(md.getRetries() + 1);
}
}
if (md.getMsg().getMsg().isPersisted()) {
if (persisted) {
systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response);
}
if (status != RpcStatus.SENT) {
log.debug("[{}][{}][{}][{}] RPC was {}! Going to send next pending request ...", deviceId, sessionId, rpcId, requestId, status.name().toLowerCase());
sendNextPendingRequest();
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE)
&& status.equals(RpcStatus.DELIVERED) && !oneWayRpc) {
return;
}
if (!status.equals(RpcStatus.SENT)) {
sendNextPendingRequest(rpcId, requestId, String.format("RPC was %s!", status.name().toLowerCase()));
}
} else {
log.warn("[{}][{}][{}][{}] RPC has already been removed from pending map.", deviceId, sessionId, rpcId, requestId);
@ -688,6 +739,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (subscribeCmd.getUnsubscribe()) {
log.debug("[{}] Canceling RPC subscription for session: [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
clearAwaitRpcResponseScheduler();
} else {
SessionInfoMetaData sessionMD = sessions.get(sessionId);
if (sessionMD == null) {
@ -722,6 +774,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
sessions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
clearAwaitRpcResponseScheduler();
if (sessions.isEmpty()) {
reportSessionClose();
}
@ -729,6 +782,27 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private ScheduledFuture<?> scheduleAwaitRpcResponseFuture(UUID rpcId, int requestId) {
return scheduler.schedule(() -> {
var md = toDeviceRpcPendingMap.remove(requestId);
if (md == null) {
return;
}
sendNextPendingRequest(rpcId, requestId, "RPC was removed from pending map due to await timeout on response from device!");
var toDeviceRpcRequest = md.getMsg().getMsg();
if (toDeviceRpcRequest.isPersisted()) {
var responseAwaitTimeout = JacksonUtil.newObjectNode().put("error", "There was a timeout awaiting for RPC response from device.");
systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.FAILED, responseAwaitTimeout);
}
}, systemContext.getRpcResponseTimeout(), TimeUnit.MILLISECONDS);
}
private void clearAwaitRpcResponseScheduler() {
if (rpcSubmitStrategy.equals(RpcSubmitStrategy.SEQUENTIAL_ON_RESPONSE_FROM_DEVICE) && awaitRpcResponseFuture != null) {
awaitRpcResponseFuture.cancel(true);
}
}
private void handleSessionActivity(SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) {
UUID sessionId = getSessionId(sessionInfoProto);
Objects.requireNonNull(sessionId);
@ -974,7 +1048,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
rpc.setStatus(RpcStatus.EXPIRED);
systemContext.getTbRpcService().save(tenantId, rpc);
} else {
registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, createToDeviceRpcRequestMsg(msg), timeout);
}
});
if (pageData.hasNext()) {

View File

@ -230,7 +230,7 @@ public class RpcV2Controller extends AbstractRpcController {
Rpc rpc = checkRpcId(rpcId, Operation.DELETE);
if (rpc != null) {
if (rpc.getStatus().equals(RpcStatus.QUEUED)) {
if (rpc.getStatus().isPushDeleteNotificationToCore()) {
RemoveRpcActorMsg removeMsg = new RemoveRpcActorMsg(getTenantId(), rpc.getDeviceId(), rpc.getUuidId());
log.trace("[{}] Forwarding msg {} to queue actor!", rpc.getDeviceId(), rpc);
tbClusterService.pushMsgToCore(removeMsg, null);

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.rpc;
import java.util.Arrays;
public enum RpcSubmitStrategy {
BURST, SEQUENTIAL_ON_ACK_FROM_DEVICE, SEQUENTIAL_ON_RESPONSE_FROM_DEVICE;
public static RpcSubmitStrategy parse(String strategyStr) {
return Arrays.stream(RpcSubmitStrategy.values())
.filter(strategy -> strategy.name().equalsIgnoreCase(strategyStr))
.findFirst()
.orElse(BURST);
}
}

View File

@ -405,8 +405,12 @@ actors:
# Enqueue the result of external node processing as a separate message to the rule engine.
force_ack: "${ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK:false}"
rpc:
# Maximum number of persistent RPC call retries in case of failed requests delivery.
max_retries: "${ACTORS_RPC_MAX_RETRIES:5}"
sequential: "${ACTORS_RPC_SEQUENTIAL:false}"
# RPC submit strategies. Allowed values: BURST, SEQUENTIAL_ON_ACK_FROM_DEVICE, SEQUENTIAL_ON_RESPONSE_FROM_DEVICE.
submit_strategy: "${ACTORS_RPC_SUBMIT_STRATEGY_TYPE:BURST}"
# Time in milliseconds for RPC to receive response after delivery. Used only for SEQUENTIAL_ON_RESPONSE_FROM_DEVICE submit strategy.
response_timeout_ms: "${ACTORS_RPC_RESPONSE_TIMEOUT_MS:30000}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"

View File

@ -65,6 +65,7 @@ import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.device.DeviceActor;
import org.thingsboard.server.actors.device.DeviceActorMessageProcessor;
import org.thingsboard.server.actors.device.SessionInfo;
import org.thingsboard.server.actors.device.ToDeviceRpcRequestMetadata;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
@ -1008,6 +1009,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
});
}
protected void awaitForDeviceActorToProcessAllRpcResponses(DeviceId deviceId) {
DeviceActorMessageProcessor processor = getDeviceActorProcessor(deviceId);
Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap = (Map<Integer, ToDeviceRpcRequestMetadata>) ReflectionTestUtils.getField(processor, "toDeviceRpcPendingMap");
Awaitility.await("Device actor pending map is empty").atMost(5, TimeUnit.SECONDS).until(() -> {
log.warn("device {}, toDeviceRpcPendingMap.size() == {}", deviceId, toDeviceRpcPendingMap.size());
return toDeviceRpcPendingMap.isEmpty();
});
}
protected static String getMapName(FeatureType featureType) {
switch (featureType) {
case ATTRIBUTES:

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.rpc;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.StringUtils;
import static org.assertj.core.api.Assertions.assertThat;
class RpcSubmitStrategyTest {
@Test
void givenRandomString_whenParse_thenReturnBurstStrategy() {
String randomString = StringUtils.randomAlphanumeric(10);
RpcSubmitStrategy parsed = RpcSubmitStrategy.parse(randomString);
assertThat(parsed).isEqualTo(RpcSubmitStrategy.BURST);
}
@Test
void givenNull_whenParse_thenReturnBurstStrategy() {
RpcSubmitStrategy parsed = RpcSubmitStrategy.parse(null);
assertThat(parsed).isEqualTo(RpcSubmitStrategy.BURST);
}
}

View File

@ -186,8 +186,12 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
}
protected void subscribeAndWait(MqttTestClient client, String attrSubTopic, DeviceId deviceId, FeatureType featureType) throws MqttException {
subscribeAndWait(client, attrSubTopic, deviceId, featureType, MqttQoS.AT_MOST_ONCE);
}
protected void subscribeAndWait(MqttTestClient client, String attrSubTopic, DeviceId deviceId, FeatureType featureType, MqttQoS mqttQoS) throws MqttException {
int subscriptionCount = getDeviceActorSubscriptionCount(deviceId, featureType);
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
client.subscribeAndWait(attrSubTopic, mqttQoS);
// TODO: This test awaits for the device actor to receive the subscription. Ideally it should not happen. See details below:
// The transport layer acknowledge subscription request once the message about subscription is in the queue.
// Test sends data immediately after acknowledgement.

View File

@ -30,7 +30,7 @@ public class MqttTestCallback implements MqttCallback {
protected CountDownLatch subscribeLatch;
protected final CountDownLatch deliveryLatch;
protected int qoS;
protected int messageArrivedQoS;
protected byte[] payloadBytes;
protected boolean pubAckReceived;
@ -53,7 +53,7 @@ public class MqttTestCallback implements MqttCallback {
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}", requestTopic);
qoS = mqttMessage.getQos();
messageArrivedQoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
subscribeLatch.countDown();
}
@ -63,6 +63,5 @@ public class MqttTestCallback implements MqttCallback {
log.warn("delivery complete: {}", iMqttDeliveryToken.getResponse());
pubAckReceived = iMqttDeliveryToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK;
deliveryLatch.countDown();
}
}

View File

@ -36,7 +36,7 @@ public class MqttTestSubscribeOnTopicCallback extends MqttTestCallback {
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
if (awaitSubTopic.equals(requestTopic)) {
qoS = mqttMessage.getQos();
messageArrivedQoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
subscribeLatch.countDown();
}

View File

@ -575,14 +575,14 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException {
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS());
assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException {
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS());
TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId());
List<TransportProtos.KeyValueProto> expectedClientKeyValueProtos = expectedResponse.getClientAttributeListList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList());
@ -606,7 +606,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException {
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS());
String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}";
assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
@ -614,7 +614,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException {
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS());
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true);
TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName());
@ -631,7 +631,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException {
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getMessageArrivedQoS());
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false);
TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedGatewayAttributeResponseMsg.getDeviceName(), actualGatewayAttributeResponseMsg.getDeviceName());

View File

@ -37,12 +37,13 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.msg.session.FeatureType;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback;
import java.util.ArrayList;
import java.util.List;
@ -105,7 +106,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
} else {
assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS());
client.disconnect();
}
@ -176,9 +177,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
return builder.build();
}
protected void processSequenceTwoWayRpcTest() throws Exception {
List<String> expected = new ArrayList<>();
List<String> result = new ArrayList<>();
protected void processSequenceOneWayRpcTest(MqttQoS mqttQoS) throws Exception {
List<String> expectedRequest = new ArrayList<>();
List<String> actualRequests = new ArrayList<>();
String deviceId = savedDevice.getId().getId().toString();
@ -186,20 +187,67 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
ObjectNode request = JacksonUtil.newObjectNode();
request.put("method", "test");
request.put("params", i);
expected.add(JacksonUtil.toString(request));
expectedRequest.add(JacksonUtil.toString(request));
request.put("persistent", true);
doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk());
doPostAsync("/api/rpc/oneway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk());
}
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
client.enableManualAcks();
MqttTestSequenceCallback callback = new MqttTestSequenceCallback(client, 10, result);
MqttTestOneWaySequenceCallback callback = new MqttTestOneWaySequenceCallback(client, 10, actualRequests);
client.setCallback(callback);
subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC);
subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC, mqttQoS);
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertEquals(expected, result);
assertEquals(expectedRequest, actualRequests);
client.disconnect();
}
protected void processSequenceTwoWayRpcTest(MqttQoS mqttQoS) throws Exception {
processSequenceTwoWayRpcTest(mqttQoS, false);
}
protected void processSequenceTwoWayRpcTest(MqttQoS mqttQoS, boolean manualAcksEnabled) throws Exception {
List<String> expectedRequest = new ArrayList<>();
List<String> actualRequests = new ArrayList<>();
List<String> rpcIds = new ArrayList<>();
List<String> expectedResponses = new ArrayList<>();
List<String> actualResponses = new ArrayList<>();
String deviceId = savedDevice.getId().getId().toString();
for (int i = 0; i < 10; i++) {
ObjectNode request = JacksonUtil.newObjectNode();
request.put("method", "test");
request.put("params", i);
expectedRequest.add(JacksonUtil.toString(request));
request.put("persistent", true);
String response = doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk());
var responseNode = JacksonUtil.toJsonNode(response);
rpcIds.add(responseNode.get("rpcId").asText());
}
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
if (manualAcksEnabled) {
client.enableManualAcks();
}
MqttTestTwoWaySequenceCallback callback = new MqttTestTwoWaySequenceCallback(
client, 10, actualRequests, expectedResponses, manualAcksEnabled);
client.setCallback(callback);
subscribeAndWait(client, DEVICE_RPC_REQUESTS_SUB_TOPIC, savedDevice.getId(), FeatureType.RPC, mqttQoS);
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertEquals(expectedRequest, actualRequests);
awaitForDeviceActorToProcessAllRpcResponses(savedDevice.getId());
for (String rpcId : rpcIds) {
Rpc rpc = doGet("/api/rpc/persistent/" + rpcId, Rpc.class);
actualResponses.add(JacksonUtil.toString(rpc.getResponse()));
}
assertEquals(expectedResponses, actualResponses);
client.disconnect();
}
protected void processJsonTwoWayRpcTestGateway(String deviceName) throws Exception {
@ -248,7 +296,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
JsonNode expectedJsonRequestData = getExpectedGatewayJsonRequestData(deviceName, setGpioRequest);
assertEquals(expectedJsonRequestData, JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS());
}
private JsonNode getExpectedGatewayJsonRequestData(String deviceName, String requestStr) {
@ -280,7 +328,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.warn("request payload: {}", JacksonUtil.fromBytes(callback.getPayloadBytes()));
assertEquals("{\"success\":true}", actualRpcResponse);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS());
}
protected void validateProtoTwoWayRpcGatewayResponse(String deviceName, MqttTestClient client, byte[] connectPayloadBytes) throws Exception {
@ -302,7 +350,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk());
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertEquals("{\"success\":true}", actualRpcResponse);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS());
}
private Device getDeviceByName(String deviceName) throws Exception {
@ -334,7 +382,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
if (awaitSubTopic.equals(requestTopic)) {
qoS = mqttMessage.getQos();
messageArrivedQoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
String responseTopic;
if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) {
@ -366,7 +414,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
if (awaitSubTopic.equals(requestTopic)) {
qoS = mqttMessage.getQos();
messageArrivedQoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
String responseTopic;
if (requestTopic.startsWith(BASE_DEVICE_API_TOPIC_V2)) {
@ -398,7 +446,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
try {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(rpcRequestMsgDescriptor, requestPayload);
List<Descriptors.FieldDescriptor> fields = rpcRequestMsgDescriptor.getFields();
for (Descriptors.FieldDescriptor fieldDescriptor: fields) {
for (Descriptors.FieldDescriptor fieldDescriptor : fields) {
assertTrue(dynamicMessage.hasField(fieldDescriptor));
}
ProtoFileElement rpcResponseProtoFileElement = DynamicProtoUtils.getProtoFileElement(protoTransportPayloadConfiguration.getDeviceRpcResponseProtoSchema());
@ -436,30 +484,69 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
return (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
}
protected class MqttTestSequenceCallback extends MqttTestCallback {
protected static class MqttTestOneWaySequenceCallback extends MqttTestCallback {
private final MqttTestClient client;
private final List<String> expected;
private final List<String> requests;
MqttTestSequenceCallback(MqttTestClient client, int subscribeCount, List<String> expected) {
MqttTestOneWaySequenceCallback(MqttTestClient client, int subscribeCount, List<String> requests) {
super(subscribeCount);
this.client = client;
this.expected = expected;
this.requests = requests;
}
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}", requestTopic);
expected.add(new String(mqttMessage.getPayload()));
String responseTopic = requestTopic.replace("request", "response");
qoS = mqttMessage.getQos();
try {
client.messageArrivedComplete(mqttMessage);
client.publish(responseTopic, processJsonMessageArrived(requestTopic, mqttMessage));
} catch (MqttException e) {
log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e);
}
requests.add(new String(mqttMessage.getPayload()));
messageArrivedQoS = mqttMessage.getQos();
subscribeLatch.countDown();
}
}
protected class MqttTestTwoWaySequenceCallback extends MqttTestCallback {
private final MqttTestClient client;
private final List<String> requests;
private final List<String> responses;
private final boolean manualAcksEnabled;
MqttTestTwoWaySequenceCallback(MqttTestClient client, int subscribeCount, List<String> requests, List<String> responses, boolean manualAcksEnabled) {
super(subscribeCount);
this.client = client;
this.requests = requests;
this.responses = responses;
this.manualAcksEnabled = manualAcksEnabled;
}
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}", requestTopic);
requests.add(new String(mqttMessage.getPayload()));
messageArrivedQoS = mqttMessage.getQos();
if (manualAcksEnabled) {
try {
client.messageArrivedComplete(mqttMessage);
} catch (MqttException e) {
log.warn("Failed to ack message delivery on topic: {} due to: ", requestTopic, e);
} finally {
subscribeLatch.countDown();
processResponse(requestTopic, mqttMessage);
}
return;
}
subscribeLatch.countDown();
processResponse(requestTopic, mqttMessage);
}
private void processResponse(String requestTopic, MqttMessage mqttMessage) {
String responseTopic = requestTopic.replace("request", "response");
byte[] responsePayload = processJsonMessageArrived(requestTopic, mqttMessage);
responses.add(new String(responsePayload));
try {
client.publish(responseTopic, responsePayload);
} catch (MqttException e) {
log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e);
}
}
}
}

View File

@ -110,11 +110,6 @@ public class MqttServerSideRpcDefaultIntegrationTest extends AbstractMqttServerS
processJsonTwoWayRpcTest(DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test
public void testSequenceServerMqttTwoWayRpc() throws Exception {
processSequenceTwoWayRpcTest();
}
@Test
public void testGatewayServerMqttOneWayRpc() throws Exception {
processJsonOneWayRpcTestGateway("Gateway Device OneWay RPC");

View File

@ -0,0 +1,72 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@Slf4j
@DaoSqlTest
@TestPropertySource(properties = {
"actors.rpc.submit_strategy=SEQUENTIAL_ON_ACK_FROM_DEVICE",
})
public class MqttServerSideRpcSequenceOnAckIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("RPC test device")
.gatewayName("RPC test gateway")
.build();
processBeforeTest(configProperties);
}
@Test
public void testSequenceServerMqttOneWayRpcQoSAtMostOnce() throws Exception {
processSequenceOneWayRpcTest(MqttQoS.AT_MOST_ONCE);
}
@Test
public void testSequenceServerMqttOneWayRpcQoSAtLeastOnce() throws Exception {
processSequenceOneWayRpcTest(MqttQoS.AT_LEAST_ONCE);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtMostOnce() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnce() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtMostOnceWithManualAcksEnabled() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE, true);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnceWithoutManualAcksEnabled() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE, true);
}
}

View File

@ -0,0 +1,73 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@Slf4j
@DaoSqlTest
@TestPropertySource(properties = {
"actors.rpc.submit_strategy=SEQUENTIAL_ON_RESPONSE_FROM_DEVICE",
})
public class MqttServerSideRpcSequenceOnResponseIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("RPC test device")
.gatewayName("RPC test gateway")
.build();
processBeforeTest(configProperties);
}
@Test
public void testSequenceServerMqttOneWayRpcQoSAtMostOnce() throws Exception {
processSequenceOneWayRpcTest(MqttQoS.AT_MOST_ONCE);
}
@Test
public void testSequenceServerMqttOneWayRpcQoSAtLeastOnce() throws Exception {
processSequenceOneWayRpcTest(MqttQoS.AT_LEAST_ONCE);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtMostOnce() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnce() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtMostOnceWithManualAcksEnabled() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_MOST_ONCE, true);
}
@Test
public void testSequenceServerMqttTwoWayRpcQoSAtLeastOnceWithoutManualAcksEnabled() throws Exception {
processSequenceTwoWayRpcTest(MqttQoS.AT_LEAST_ONCE, true);
}
}

View File

@ -332,7 +332,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertEquals(payload.getBytes(), callback.getPayloadBytes());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getMessageArrivedQoS());
}
}

View File

@ -13,7 +13,7 @@ transport.lwm2m.security.trust-credentials.keystore.store_file=lwm2m/credentials
edges.enabled=false
edges.storage.no_read_records_sleep=500
edges.storage.sleep_between_batches=500
actors.rpc.sequential=true
actors.rpc.submit_strategy=BURST
queue.rule-engine.stats.enabled=true
# Transports disabled to speed up the context init. Particular transport will be enabled with @TestPropertySource in respective tests

View File

@ -15,6 +15,24 @@
*/
package org.thingsboard.server.common.data.rpc;
import lombok.Getter;
public enum RpcStatus {
QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, EXPIRED, FAILED, DELETED
QUEUED(true),
SENT(true),
DELIVERED(true),
SUCCESSFUL(false),
TIMEOUT(false),
EXPIRED(false),
FAILED(false),
DELETED(false);
@Getter
private final boolean pushDeleteNotificationToCore;
RpcStatus(boolean pushDeleteNotificationToCore) {
this.pushDeleteNotificationToCore = pushDeleteNotificationToCore;
}
}

View File

@ -0,0 +1,47 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.rpc;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.thingsboard.server.common.data.rpc.RpcStatus.DELIVERED;
import static org.thingsboard.server.common.data.rpc.RpcStatus.QUEUED;
import static org.thingsboard.server.common.data.rpc.RpcStatus.SENT;
class RpcStatusTest {
private static final List<RpcStatus> pushDeleteNotificationToCoreStatuses = List.of(
QUEUED,
SENT,
DELIVERED
);
@Test
void isPushDeleteNotificationToCoreStatusTest() {
var rpcStatuses = RpcStatus.values();
for (var status : rpcStatuses) {
if (pushDeleteNotificationToCoreStatuses.contains(status)) {
assertThat(status.isPushDeleteNotificationToCore()).isTrue();
} else {
assertThat(status.isPushDeleteNotificationToCore()).isFalse();
}
}
}
}

View File

@ -27,12 +27,12 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Component
public class DefaultSchedulerComponent implements SchedulerComponent{
public class DefaultSchedulerComponent implements SchedulerComponent {
protected ScheduledExecutorService schedulerExecutor;
@PostConstruct
public void init(){
public void init() {
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler"));
}