From 0bbec75e7550eb427f9f62b67d2001cec14cd8c4 Mon Sep 17 00:00:00 2001 From: Artem Barysh Date: Wed, 28 May 2025 18:16:54 +0300 Subject: [PATCH 1/4] Fixed channel disconnection --- .../src/main/java/org/thingsboard/mqtt/MqttClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 543553951d..65c195ea03 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -462,7 +462,7 @@ final class MqttClientImpl implements MqttClient { MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)); ChannelFuture channelFuture = this.sendAndFlushPacket(message); eventLoop.schedule(() -> { - if (!channelFuture.isDone()) { + if (channel.isOpen()) { this.channel.close(); } }, 500, TimeUnit.MILLISECONDS); From e112077cb0da9dbbd74dab20aa076c8997a3794a Mon Sep 17 00:00:00 2001 From: Artem Barysh Date: Thu, 29 May 2025 13:52:08 +0300 Subject: [PATCH 2/4] fixed --- .../org/thingsboard/mqtt/MqttClientImpl.java | 15 ++++++++-- .../org/thingsboard/mqtt/MqttClientTest.java | 30 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 65c195ea03..5e2c5d44cf 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -456,16 +456,25 @@ final class MqttClientImpl implements MqttClient { @Override public void disconnect() { + if (disconnected) { + return; + } + log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN"); - disconnected = true; if (this.channel != null) { MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)); - ChannelFuture channelFuture = this.sendAndFlushPacket(message); + + sendAndFlushPacket(message).addListener((ChannelFutureListener) future -> { + future.channel().close(); + disconnected = true; + }); + eventLoop.schedule(() -> { if (channel.isOpen()) { this.channel.close(); + disconnected = true; } - }, 500, TimeUnit.MILLISECONDS); + }, 1, TimeUnit.SECONDS); } } diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java index 1481b354ee..a65c9fb4c7 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java @@ -119,6 +119,36 @@ class MqttClientTest { assertThat(client.isConnected()).isTrue(); } + @Test + void testDisconnectFromBroker() { + // GIVEN + var clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId("Test[ConnectToBroker]"); + clientConfig.setClientId("connect"); + + client = MqttClient.create(clientConfig, null, handlerExecutor); + + // WHEN + Promise connectFuture = client.connect(broker.getHost(), broker.getMqttPort()); + + // THEN + assertThat(connectFuture).isNotNull(); + + Awaitility.await("waiting for client to connect") + .atMost(Duration.ofSeconds(10L)) + .until(connectFuture::isDone); + + assertThat(connectFuture.isSuccess()).isTrue(); + + // WHEN + client.disconnect(); + + // THEN + Awaitility.await("waiting for client to disconnect") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(client.isConnected()).isFalse()); + } + @Test void testDisconnectDueToKeepAliveIfNoActivity() { // GIVEN From 9786e0a2f884d77dbdc55a7c4410795f79325d3a Mon Sep 17 00:00:00 2001 From: Artem Barysh Date: Mon, 2 Jun 2025 16:21:35 +0300 Subject: [PATCH 3/4] Resolved PR comments --- .../org/thingsboard/mqtt/MqttClientImpl.java | 6 ++++-- .../org/thingsboard/mqtt/MqttClientTest.java | 16 +++------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 5e2c5d44cf..801470284b 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -96,6 +96,8 @@ final class MqttClientImpl implements MqttClient { private final ListeningExecutor handlerExecutor; + private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1; + /** * Construct the MqttClientImpl with default config */ @@ -468,13 +470,13 @@ final class MqttClientImpl implements MqttClient { future.channel().close(); disconnected = true; }); - eventLoop.schedule(() -> { if (channel.isOpen()) { + log.trace("[{}] Channel still open after {} second; forcing close now", channel.id(), DISCONNECT_FALLBACK_DELAY_SECS); this.channel.close(); disconnected = true; } - }, 1, TimeUnit.SECONDS); + }, DISCONNECT_FALLBACK_DELAY_SECS, TimeUnit.SECONDS); } } diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java index a65c9fb4c7..60e625aa8d 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java @@ -123,22 +123,12 @@ class MqttClientTest { void testDisconnectFromBroker() { // GIVEN var clientConfig = new MqttClientConfig(); - clientConfig.setOwnerId("Test[ConnectToBroker]"); - clientConfig.setClientId("connect"); + clientConfig.setOwnerId("Test[Disconnect]"); + clientConfig.setClientId("disconnect"); client = MqttClient.create(clientConfig, null, handlerExecutor); - // WHEN - Promise connectFuture = client.connect(broker.getHost(), broker.getMqttPort()); - - // THEN - assertThat(connectFuture).isNotNull(); - - Awaitility.await("waiting for client to connect") - .atMost(Duration.ofSeconds(10L)) - .until(connectFuture::isDone); - - assertThat(connectFuture.isSuccess()).isTrue(); + connect(broker.getHost(), broker.getMqttPort()); // WHEN client.disconnect(); From 16a1d65c286a9bdf1b0a76f3d713413a2d4fc20b Mon Sep 17 00:00:00 2001 From: Artem Barysh Date: Mon, 2 Jun 2025 18:02:07 +0300 Subject: [PATCH 4/4] Removed test --- .../server/msa/connectivity/MqttClientTest.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 1b1ed9bf0f..eb30ef8b9c 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -556,22 +556,6 @@ public class MqttClientTest extends AbstractContainerTest { assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND"); } - @Test - public void regularDisconnect() throws Exception { - DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); - - MqttMessageListener listener = new MqttMessageListener(); - MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5); - final List returnCodeByteValue = new ArrayList<>(); - MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue); - mqttClient.setCallback(callbackForDisconnectWithReturnCode); - mqttClient.disconnect(); - Thread.sleep(1000); - assertThat(returnCodeByteValue.size()).isEqualTo(1); - MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0)); - assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.NORMAL_DISCONNECT); - } - @Test public void clientSessionTakenOverDisconnect() throws Exception { DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());