Merge pull request #11994 from ShvaykaD/feature/close-transport-session-on-rpc-delivery-timeout

Added ability to close transport session on RPC delivery timeout
This commit is contained in:
Andrew Shvayka 2024-12-02 11:48:10 +01:00 committed by GitHub
commit 1cd4122037
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 251 additions and 36 deletions

View File

@ -570,6 +570,10 @@ public class ActorSystemContext {
@Getter
private String rpcSubmitStrategy;
@Value("${actors.rpc.close_session_on_rpc_delivery_timeout:false}")
@Getter
private boolean closeTransportSessionOnRpcDeliveryTimeout;
@Value("${actors.rpc.response_timeout_ms:30000}")
@Getter
private long rpcResponseTimeout;

View File

@ -132,6 +132,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private final boolean rpcSequential;
private final RpcSubmitStrategy rpcSubmitStrategy;
private final ScheduledExecutorService scheduler;
private final boolean closeTransportSessionOnRpcDeliveryTimeout;
private int rpcSeq = 0;
private String deviceName;
@ -145,6 +146,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.tenantId = tenantId;
this.deviceId = deviceId;
this.rpcSubmitStrategy = RpcSubmitStrategy.parse(systemContext.getRpcSubmitStrategy());
this.closeTransportSessionOnRpcDeliveryTimeout = systemContext.isCloseTransportSessionOnRpcDeliveryTimeout();
this.rpcSequential = !rpcSubmitStrategy.equals(RpcSubmitStrategy.BURST);
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
@ -223,7 +225,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e);
}
} else if (isSendNewRpcAvailable()) {
sent = rpcSubscriptions.size() > 0;
sent = !rpcSubscriptions.isEmpty();
Set<UUID> syncSessionSet = new HashSet<>();
rpcSubscriptions.forEach((sessionId, sessionInfo) -> {
log.debug("[{}][{}][{}][{}] send RPC request to transport ...", deviceId, sessionId, rpcId, requestId);
@ -255,8 +257,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private boolean isSendNewRpcAvailable() {
return switch (rpcSubmitStrategy) {
case SEQUENTIAL_ON_ACK_FROM_DEVICE -> toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ->
toDeviceRpcPendingMap.values().stream().filter(ToDeviceRpcRequestMetadata::isDelivered).findAny().isEmpty();
case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE -> toDeviceRpcPendingMap.isEmpty();
default -> true;
};
}
@ -598,7 +599,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
void processAttributesUpdate(DeviceAttributesEventNotificationMsg msg) {
if (attributeSubscriptions.size() > 0) {
if (!attributeSubscriptions.isEmpty()) {
boolean hasNotificationData = false;
AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder();
if (msg.isDeleted()) {
@ -613,7 +614,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
} else {
if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
if (attributes.size() > 0) {
if (!attributes.isEmpty()) {
List<TsKvProto> sharedUpdated = msg.getValues().stream().map(t -> KvProtoUtil.toTsKvProto(t.getLastUpdateTs(), t))
.collect(Collectors.toList());
if (!sharedUpdated.isEmpty()) {
@ -705,10 +706,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
maxRpcRetries = maxRpcRetries == null ?
systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
if (maxRpcRetries <= md.getRetries()) {
toDeviceRpcPendingMap.remove(requestId);
status = RpcStatus.FAILED;
response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " +
"attempts have been exhausted. Retry attempts set: " + maxRpcRetries);
if (closeTransportSessionOnRpcDeliveryTimeout) {
md.setRetries(0);
status = RpcStatus.QUEUED;
notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason.RPC_DELIVERY_TIMEOUT);
} else {
toDeviceRpcPendingMap.remove(requestId);
status = RpcStatus.FAILED;
response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " +
"attempts have been exhausted. Retry attempts set: " + maxRpcRetries);
}
} else {
md.setRetries(md.getRetries() + 1);
}
@ -843,28 +850,30 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processCredentialsUpdate(TbActorMsg msg) {
if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
sessions.forEach((k, v) -> {
notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials());
});
sessions.forEach((k, v) ->
notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials()));
} else {
sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!", SessionCloseReason.CREDENTIALS_UPDATED));
attributeSubscriptions.clear();
rpcSubscriptions.clear();
dumpSessions();
notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason.CREDENTIALS_UPDATED);
}
}
private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) {
log.debug("remove eldest session (max concurrent sessions limit reached per device) sessionId: [{}] sessionMd: [{}]", sessionId, sessionMd);
notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!", SessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED);
private void notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason transportSessionCloseReason) {
sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, transportSessionCloseReason));
attributeSubscriptions.clear();
rpcSubscriptions.clear();
dumpSessions();
}
private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message, SessionCloseReason reason) {
private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) {
notifyTransportAboutClosedSession(sessionId, sessionMd, TransportSessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED);
}
private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, TransportSessionCloseReason transportSessionCloseReason) {
log.debug("{} sessionId: [{}] sessionMd: [{}]", transportSessionCloseReason.getLogMessage(), sessionId, sessionMd);
SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto
.newBuilder()
.setMessage(message)
.setReason(reason)
.setMessage(transportSessionCloseReason.getNotificationMessage())
.setReason(SessionCloseReason.forNumber(transportSessionCloseReason.getProtoNumber()))
.build();
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
@ -1048,7 +1057,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
attributeSubscriptions.remove(id);
if (session != null) {
removed++;
notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE, SessionCloseReason.SESSION_TIMEOUT);
notifyTransportAboutClosedSession(id, session, TransportSessionCloseReason.SESSION_TIMEOUT);
}
}
if (removed != 0) {

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.Getter;
@Getter
public enum TransportSessionCloseReason {
UNKNOWN_REASON(0, "Unknown Reason.", "Session closed with unknown reason."),
CREDENTIALS_UPDATED(1, "device credentials updated!", "Close session due to device credentials update."),
MAX_CONCURRENT_SESSIONS_LIMIT_REACHED(2, "max concurrent sessions limit reached per device!", "Remove eldest session (max concurrent sessions limit reached per device)."),
SESSION_TIMEOUT(3, "session timeout!", "Close session due to session timeout."),
RPC_DELIVERY_TIMEOUT(4, "RPC delivery failed!", "Close session due to RPC delivery failure.");
private final int protoNumber;
private final String notificationMessage;
private final String logMessage;
TransportSessionCloseReason(int protoNumber, String notificationMessage, String logMessage) {
this.protoNumber = protoNumber;
this.notificationMessage = notificationMessage;
this.logMessage = logMessage;
}
}

View File

@ -486,6 +486,17 @@ actors:
submit_strategy: "${ACTORS_RPC_SUBMIT_STRATEGY_TYPE:BURST}"
# Time in milliseconds for RPC to receive a response after delivery. Used only for SEQUENTIAL_ON_RESPONSE_FROM_DEVICE submit strategy.
response_timeout_ms: "${ACTORS_RPC_RESPONSE_TIMEOUT_MS:30000}"
# Close transport session if RPC delivery timed out. If enabled, RPC will be reverted to the queued state.
# Note:
# - For MQTT transport:
# - QoS level 0: This feature does not apply, as no acknowledgment is expected, and therefore no timeout is triggered.
# - QoS level 1: This feature applies, as an acknowledgment is expected.
# - QoS level 2: Unsupported.
# - For CoAP transport:
# - Confirmable requests: This feature applies, as delivery confirmation is expected.
# - Non-confirmable requests: This feature does not apply, as no delivery acknowledgment is expected.
# - For HTTP and SNPM transports: RPC is considered delivered immediately, and there is no logic to await acknowledgment.
close_session_on_rpc_delivery_timeout: "${ACTORS_RPC_CLOSE_SESSION_ON_RPC_DELIVERY_TIMEOUT:false}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"

View File

@ -0,0 +1,157 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.rpc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
@Slf4j
@DaoSqlTest
@TestPropertySource(properties = {
"actors.rpc.close_session_on_rpc_delivery_timeout=true",
"transport.mqtt.timeout=100",
})
public class MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest extends AbstractMqttV5RpcTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("RPC test device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testOneWayRpcCloseSessionOnRpcDeliveryTimeout() throws Exception {
testCloseSessionOnRpcDeliveryTimeout("oneway");
}
@Test
public void testTwoWayRpcCloseSessionOnRpcDeliveryTimeout() throws Exception {
testCloseSessionOnRpcDeliveryTimeout("twoway");
}
private void testCloseSessionOnRpcDeliveryTimeout(String rpcType) throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.enableManualAcks();
client.connectAndWait(accessToken);
MqttV5NoAckTestCallback callback = new MqttV5NoAckTestCallback();
client.setCallback(callback);
client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_LEAST_ONCE);
String expectedReceivedPayload = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
long expirationTime = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1);
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1},\"persistent\":true,\"retries\":0,\"expirationTime\": " + expirationTime + "}";
String result = doPostAsync("/api/rpc/" + rpcType + "/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk());
assertThat(result).isNotNull();
JsonNode response = JacksonUtil.toJsonNode(result);
assertThat(response).isNotNull();
assertThat(response.hasNonNull("rpcId")).isTrue();
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(callback.getQoS()).isEqualTo(MqttQoS.AT_LEAST_ONCE.value());
assertThat(JacksonUtil.fromBytes(callback.getPayloadBytes()))
.isEqualTo(JacksonUtil.toJsonNode(expectedReceivedPayload));
callback.getDisconnectLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(callback.getReturnCode()).isEqualTo(MqttReturnCode.RETURN_CODE_ADMINISTRITIVE_ACTION);
Rpc persistedRpc = doGet("/api/rpc/persistent/" + response.get("rpcId").asText(), Rpc.class);
assertThat(persistedRpc).isNotNull();
assertThat(persistedRpc.getStatus()).isEqualTo(RpcStatus.QUEUED);
assertThat(persistedRpc.getResponse()).isInstanceOf(NullNode.class);
assertThat(client.isConnected()).isFalse();
}
@Getter
private static class MqttV5NoAckTestCallback extends MqttV5TestCallback {
private int returnCode;
public CountDownLatch getDisconnectLatch() {
return super.getDeliveryLatch();
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
log.warn("MqttDisconnectResponse: {}", mqttDisconnectResponse);
returnCode = mqttDisconnectResponse.getReturnCode();
getDisconnectLatch().countDown();
}
@Override
public void mqttErrorOccurred(MqttException e) {
log.warn("Error occurred:", e);
}
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}", requestTopic);
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
subscribeLatch.countDown();
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
// should be never called, Since we're never going to send a response back to server.
log.warn("delivery complete: {}", iMqttToken.getResponse());
pubAckReceived = iMqttToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK;
getDisconnectLatch().countDown();
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.warn("Connect completed: reconnect - {}, serverURI - {}", reconnect, serverURI);
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties mqttProperties) {
log.warn("Auth package received: reasonCode - {}, mqtt properties - {}", reasonCode, mqttProperties);
}
}
}

View File

@ -608,6 +608,7 @@ enum SessionCloseReason {
CREDENTIALS_UPDATED = 1;
MAX_CONCURRENT_SESSIONS_LIMIT_REACHED = 2;
SESSION_TIMEOUT = 3;
RPC_DELIVERY_TIMEOUT = 4;
}
message SessionCloseNotificationProto {

View File

@ -1328,18 +1328,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR;
switch (sessionCloseNotification.getReason()) {
case CREDENTIALS_UPDATED:
returnCode = MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION;
break;
case MAX_CONCURRENT_SESSIONS_LIMIT_REACHED:
returnCode = MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER;
break;
case SESSION_TIMEOUT:
returnCode = MqttReasonCodes.Disconnect.MAXIMUM_CONNECT_TIME;
break;
}
MqttReasonCodes.Disconnect returnCode = switch (sessionCloseNotification.getReason()) {
case CREDENTIALS_UPDATED, RPC_DELIVERY_TIMEOUT -> MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION;
case MAX_CONCURRENT_SESSIONS_LIMIT_REACHED -> MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER;
case SESSION_TIMEOUT -> MqttReasonCodes.Disconnect.MAXIMUM_CONNECT_TIME;
default -> MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR;
};
closeCtx(deviceSessionCtx.getChannel(), returnCode);
}