diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 5e2c5d44cf..801470284b 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -96,6 +96,8 @@ final class MqttClientImpl implements MqttClient { private final ListeningExecutor handlerExecutor; + private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1; + /** * Construct the MqttClientImpl with default config */ @@ -468,13 +470,13 @@ final class MqttClientImpl implements MqttClient { future.channel().close(); disconnected = true; }); - eventLoop.schedule(() -> { if (channel.isOpen()) { + log.trace("[{}] Channel still open after {} second; forcing close now", channel.id(), DISCONNECT_FALLBACK_DELAY_SECS); this.channel.close(); disconnected = true; } - }, 1, TimeUnit.SECONDS); + }, DISCONNECT_FALLBACK_DELAY_SECS, TimeUnit.SECONDS); } } diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java index a65c9fb4c7..60e625aa8d 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java @@ -123,22 +123,12 @@ class MqttClientTest { void testDisconnectFromBroker() { // GIVEN var clientConfig = new MqttClientConfig(); - clientConfig.setOwnerId("Test[ConnectToBroker]"); - clientConfig.setClientId("connect"); + clientConfig.setOwnerId("Test[Disconnect]"); + clientConfig.setClientId("disconnect"); client = MqttClient.create(clientConfig, null, handlerExecutor); - // WHEN - Promise connectFuture = client.connect(broker.getHost(), broker.getMqttPort()); - - // THEN - assertThat(connectFuture).isNotNull(); - - Awaitility.await("waiting for client to connect") - .atMost(Duration.ofSeconds(10L)) - .until(connectFuture::isDone); - - assertThat(connectFuture.isSuccess()).isTrue(); + connect(broker.getHost(), broker.getMqttPort()); // WHEN client.disconnect();