From 9786e0a2f884d77dbdc55a7c4410795f79325d3a Mon Sep 17 00:00:00 2001 From: Artem Barysh Date: Mon, 2 Jun 2025 16:21:35 +0300 Subject: [PATCH] Resolved PR comments --- .../org/thingsboard/mqtt/MqttClientImpl.java | 6 ++++-- .../org/thingsboard/mqtt/MqttClientTest.java | 16 +++------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 5e2c5d44cf..801470284b 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -96,6 +96,8 @@ final class MqttClientImpl implements MqttClient { private final ListeningExecutor handlerExecutor; + private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1; + /** * Construct the MqttClientImpl with default config */ @@ -468,13 +470,13 @@ final class MqttClientImpl implements MqttClient { future.channel().close(); disconnected = true; }); - eventLoop.schedule(() -> { if (channel.isOpen()) { + log.trace("[{}] Channel still open after {} second; forcing close now", channel.id(), DISCONNECT_FALLBACK_DELAY_SECS); this.channel.close(); disconnected = true; } - }, 1, TimeUnit.SECONDS); + }, DISCONNECT_FALLBACK_DELAY_SECS, TimeUnit.SECONDS); } } diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java index a65c9fb4c7..60e625aa8d 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java @@ -123,22 +123,12 @@ class MqttClientTest { void testDisconnectFromBroker() { // GIVEN var clientConfig = new MqttClientConfig(); - clientConfig.setOwnerId("Test[ConnectToBroker]"); - clientConfig.setClientId("connect"); + clientConfig.setOwnerId("Test[Disconnect]"); + clientConfig.setClientId("disconnect"); client = MqttClient.create(clientConfig, null, handlerExecutor); - // WHEN - Promise connectFuture = client.connect(broker.getHost(), broker.getMqttPort()); - - // THEN - assertThat(connectFuture).isNotNull(); - - Awaitility.await("waiting for client to connect") - .atMost(Duration.ofSeconds(10L)) - .until(connectFuture::isDone); - - assertThat(connectFuture.isSuccess()).isTrue(); + connect(broker.getHost(), broker.getMqttPort()); // WHEN client.disconnect();