diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index f6f2aef6f1..1c18a31f68 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -570,6 +570,10 @@ public class ActorSystemContext { @Getter private String rpcSubmitStrategy; + @Value("${actors.rpc.close_session_on_rpc_delivery_timeout:false}") + @Getter + private boolean closeTransportSessionOnRpcDeliveryTimeout; + @Value("${actors.rpc.response_timeout_ms:30000}") @Getter private long rpcResponseTimeout; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 3e8cd9592f..3dca3417b4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -132,6 +132,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private final boolean rpcSequential; private final RpcSubmitStrategy rpcSubmitStrategy; private final ScheduledExecutorService scheduler; + private final boolean closeTransportSessionOnRpcDeliveryTimeout; private int rpcSeq = 0; private String deviceName; @@ -145,6 +146,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso this.tenantId = tenantId; this.deviceId = deviceId; this.rpcSubmitStrategy = RpcSubmitStrategy.parse(systemContext.getRpcSubmitStrategy()); + this.closeTransportSessionOnRpcDeliveryTimeout = systemContext.isCloseTransportSessionOnRpcDeliveryTimeout(); this.rpcSequential = !rpcSubmitStrategy.equals(RpcSubmitStrategy.BURST); this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); @@ -223,7 +225,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e); } } else if (isSendNewRpcAvailable()) { - sent = rpcSubscriptions.size() > 0; + sent = !rpcSubscriptions.isEmpty(); Set syncSessionSet = new HashSet<>(); rpcSubscriptions.forEach((sessionId, sessionInfo) -> { log.debug("[{}][{}][{}][{}] send RPC request to transport ...", deviceId, sessionId, rpcId, requestId); @@ -255,8 +257,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private boolean isSendNewRpcAvailable() { return switch (rpcSubmitStrategy) { case SEQUENTIAL_ON_ACK_FROM_DEVICE -> toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); - case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE -> - toDeviceRpcPendingMap.values().stream().filter(ToDeviceRpcRequestMetadata::isDelivered).findAny().isEmpty(); + case SEQUENTIAL_ON_RESPONSE_FROM_DEVICE -> toDeviceRpcPendingMap.isEmpty(); default -> true; }; } @@ -598,7 +599,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } void processAttributesUpdate(DeviceAttributesEventNotificationMsg msg) { - if (attributeSubscriptions.size() > 0) { + if (!attributeSubscriptions.isEmpty()) { boolean hasNotificationData = false; AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder(); if (msg.isDeleted()) { @@ -613,7 +614,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } else { if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) { List attributes = new ArrayList<>(msg.getValues()); - if (attributes.size() > 0) { + if (!attributes.isEmpty()) { List sharedUpdated = msg.getValues().stream().map(t -> KvProtoUtil.toTsKvProto(t.getLastUpdateTs(), t)) .collect(Collectors.toList()); if (!sharedUpdated.isEmpty()) { @@ -705,10 +706,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries()); if (maxRpcRetries <= md.getRetries()) { - toDeviceRpcPendingMap.remove(requestId); - status = RpcStatus.FAILED; - response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " + - "attempts have been exhausted. Retry attempts set: " + maxRpcRetries); + if (closeTransportSessionOnRpcDeliveryTimeout) { + md.setRetries(0); + status = RpcStatus.QUEUED; + notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason.RPC_DELIVERY_TIMEOUT); + } else { + toDeviceRpcPendingMap.remove(requestId); + status = RpcStatus.FAILED; + response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry " + + "attempts have been exhausted. Retry attempts set: " + maxRpcRetries); + } } else { md.setRetries(md.getRetries() + 1); } @@ -843,28 +850,30 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processCredentialsUpdate(TbActorMsg msg) { if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { - sessions.forEach((k, v) -> { - notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials()); - }); + sessions.forEach((k, v) -> + notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials())); } else { - sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!", SessionCloseReason.CREDENTIALS_UPDATED)); - attributeSubscriptions.clear(); - rpcSubscriptions.clear(); - dumpSessions(); - + notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason.CREDENTIALS_UPDATED); } } - private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) { - log.debug("remove eldest session (max concurrent sessions limit reached per device) sessionId: [{}] sessionMd: [{}]", sessionId, sessionMd); - notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!", SessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED); + private void notifyTransportAboutSessionsCloseAndDumpSessions(TransportSessionCloseReason transportSessionCloseReason) { + sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, transportSessionCloseReason)); + attributeSubscriptions.clear(); + rpcSubscriptions.clear(); + dumpSessions(); } - private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message, SessionCloseReason reason) { + private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) { + notifyTransportAboutClosedSession(sessionId, sessionMd, TransportSessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED); + } + + private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, TransportSessionCloseReason transportSessionCloseReason) { + log.debug("{} sessionId: [{}] sessionMd: [{}]", transportSessionCloseReason.getLogMessage(), sessionId, sessionMd); SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto .newBuilder() - .setMessage(message) - .setReason(reason) + .setMessage(transportSessionCloseReason.getNotificationMessage()) + .setReason(SessionCloseReason.forNumber(transportSessionCloseReason.getProtoNumber())) .build(); ToTransportMsg msg = ToTransportMsg.newBuilder() .setSessionIdMSB(sessionId.getMostSignificantBits()) @@ -1048,7 +1057,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso attributeSubscriptions.remove(id); if (session != null) { removed++; - notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE, SessionCloseReason.SESSION_TIMEOUT); + notifyTransportAboutClosedSession(id, session, TransportSessionCloseReason.SESSION_TIMEOUT); } } if (removed != 0) { diff --git a/application/src/main/java/org/thingsboard/server/actors/device/TransportSessionCloseReason.java b/application/src/main/java/org/thingsboard/server/actors/device/TransportSessionCloseReason.java new file mode 100644 index 0000000000..ce8c936515 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/device/TransportSessionCloseReason.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.device; + +import lombok.Getter; + +@Getter +public enum TransportSessionCloseReason { + + UNKNOWN_REASON(0, "Unknown Reason.", "Session closed with unknown reason."), + CREDENTIALS_UPDATED(1, "device credentials updated!", "Close session due to device credentials update."), + MAX_CONCURRENT_SESSIONS_LIMIT_REACHED(2, "max concurrent sessions limit reached per device!", "Remove eldest session (max concurrent sessions limit reached per device)."), + SESSION_TIMEOUT(3, "session timeout!", "Close session due to session timeout."), + RPC_DELIVERY_TIMEOUT(4, "RPC delivery failed!", "Close session due to RPC delivery failure."); + + private final int protoNumber; + private final String notificationMessage; + private final String logMessage; + + TransportSessionCloseReason(int protoNumber, String notificationMessage, String logMessage) { + this.protoNumber = protoNumber; + this.notificationMessage = notificationMessage; + this.logMessage = logMessage; + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 36dfe0635e..a5e3532258 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -486,6 +486,17 @@ actors: submit_strategy: "${ACTORS_RPC_SUBMIT_STRATEGY_TYPE:BURST}" # Time in milliseconds for RPC to receive a response after delivery. Used only for SEQUENTIAL_ON_RESPONSE_FROM_DEVICE submit strategy. response_timeout_ms: "${ACTORS_RPC_RESPONSE_TIMEOUT_MS:30000}" + # Close transport session if RPC delivery timed out. If enabled, RPC will be reverted to the queued state. + # Note: + # - For MQTT transport: + # - QoS level 0: This feature does not apply, as no acknowledgment is expected, and therefore no timeout is triggered. + # - QoS level 1: This feature applies, as an acknowledgment is expected. + # - QoS level 2: Unsupported. + # - For CoAP transport: + # - Confirmable requests: This feature applies, as delivery confirmation is expected. + # - Non-confirmable requests: This feature does not apply, as no delivery acknowledgment is expected. + # - For HTTP and SNPM transports: RPC is considered delivered immediately, and there is no logic to await acknowledgment. + close_session_on_rpc_delivery_timeout: "${ACTORS_RPC_CLOSE_SESSION_ON_RPC_DELIVERY_TIMEOUT:false}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java new file mode 100644 index 0000000000..dc366c11f2 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest.java @@ -0,0 +1,157 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv5.rpc; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; +import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.context.TestPropertySource; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback; +import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; + +@Slf4j +@DaoSqlTest +@TestPropertySource(properties = { + "actors.rpc.close_session_on_rpc_delivery_timeout=true", + "transport.mqtt.timeout=100", +}) +public class MqttV5CloseTransportSessionOnRpcDeliveryTimeoutIntegrationTest extends AbstractMqttV5RpcTest { + + @Before + public void beforeTest() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("RPC test device") + .build(); + processBeforeTest(configProperties); + } + + @Test + public void testOneWayRpcCloseSessionOnRpcDeliveryTimeout() throws Exception { + testCloseSessionOnRpcDeliveryTimeout("oneway"); + } + + @Test + public void testTwoWayRpcCloseSessionOnRpcDeliveryTimeout() throws Exception { + testCloseSessionOnRpcDeliveryTimeout("twoway"); + } + + private void testCloseSessionOnRpcDeliveryTimeout(String rpcType) throws Exception { + MqttV5TestClient client = new MqttV5TestClient(); + client.enableManualAcks(); + client.connectAndWait(accessToken); + MqttV5NoAckTestCallback callback = new MqttV5NoAckTestCallback(); + client.setCallback(callback); + client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_LEAST_ONCE); + + String expectedReceivedPayload = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; + long expirationTime = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1); + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1},\"persistent\":true,\"retries\":0,\"expirationTime\": " + expirationTime + "}"; + String result = doPostAsync("/api/rpc/" + rpcType + "/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk()); + + assertThat(result).isNotNull(); + JsonNode response = JacksonUtil.toJsonNode(result); + assertThat(response).isNotNull(); + assertThat(response.hasNonNull("rpcId")).isTrue(); + + callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(callback.getQoS()).isEqualTo(MqttQoS.AT_LEAST_ONCE.value()); + assertThat(JacksonUtil.fromBytes(callback.getPayloadBytes())) + .isEqualTo(JacksonUtil.toJsonNode(expectedReceivedPayload)); + + callback.getDisconnectLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(callback.getReturnCode()).isEqualTo(MqttReturnCode.RETURN_CODE_ADMINISTRITIVE_ACTION); + + Rpc persistedRpc = doGet("/api/rpc/persistent/" + response.get("rpcId").asText(), Rpc.class); + assertThat(persistedRpc).isNotNull(); + assertThat(persistedRpc.getStatus()).isEqualTo(RpcStatus.QUEUED); + assertThat(persistedRpc.getResponse()).isInstanceOf(NullNode.class); + assertThat(client.isConnected()).isFalse(); + } + + @Getter + private static class MqttV5NoAckTestCallback extends MqttV5TestCallback { + + private int returnCode; + + public CountDownLatch getDisconnectLatch() { + return super.getDeliveryLatch(); + } + + @Override + public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { + log.warn("MqttDisconnectResponse: {}", mqttDisconnectResponse); + returnCode = mqttDisconnectResponse.getReturnCode(); + getDisconnectLatch().countDown(); + } + + @Override + public void mqttErrorOccurred(MqttException e) { + log.warn("Error occurred:", e); + } + + @Override + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { + log.warn("messageArrived on topic: {}", requestTopic); + qoS = mqttMessage.getQos(); + payloadBytes = mqttMessage.getPayload(); + subscribeLatch.countDown(); + } + + @Override + public void deliveryComplete(IMqttToken iMqttToken) { + // should be never called, Since we're never going to send a response back to server. + log.warn("delivery complete: {}", iMqttToken.getResponse()); + pubAckReceived = iMqttToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK; + getDisconnectLatch().countDown(); + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.warn("Connect completed: reconnect - {}, serverURI - {}", reconnect, serverURI); + } + + @Override + public void authPacketArrived(int reasonCode, MqttProperties mqttProperties) { + log.warn("Auth package received: reasonCode - {}, mqtt properties - {}", reasonCode, mqttProperties); + } + + + } + +} diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 380910787c..228a4039d2 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -608,6 +608,7 @@ enum SessionCloseReason { CREDENTIALS_UPDATED = 1; MAX_CONCURRENT_SESSIONS_LIMIT_REACHED = 2; SESSION_TIMEOUT = 3; + RPC_DELIVERY_TIMEOUT = 4; } message SessionCloseNotificationProto { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 54df37c3f6..2b7e2a51dc 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1328,18 +1328,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); - MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR; - switch (sessionCloseNotification.getReason()) { - case CREDENTIALS_UPDATED: - returnCode = MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION; - break; - case MAX_CONCURRENT_SESSIONS_LIMIT_REACHED: - returnCode = MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER; - break; - case SESSION_TIMEOUT: - returnCode = MqttReasonCodes.Disconnect.MAXIMUM_CONNECT_TIME; - break; - } + MqttReasonCodes.Disconnect returnCode = switch (sessionCloseNotification.getReason()) { + case CREDENTIALS_UPDATED, RPC_DELIVERY_TIMEOUT -> MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION; + case MAX_CONCURRENT_SESSIONS_LIMIT_REACHED -> MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER; + case SESSION_TIMEOUT -> MqttReasonCodes.Disconnect.MAXIMUM_CONNECT_TIME; + default -> MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR; + }; closeCtx(deviceSessionCtx.getChannel(), returnCode); }