diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 65c195ea03..5e2c5d44cf 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -456,16 +456,25 @@ final class MqttClientImpl implements MqttClient { @Override public void disconnect() { + if (disconnected) { + return; + } + log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN"); - disconnected = true; if (this.channel != null) { MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)); - ChannelFuture channelFuture = this.sendAndFlushPacket(message); + + sendAndFlushPacket(message).addListener((ChannelFutureListener) future -> { + future.channel().close(); + disconnected = true; + }); + eventLoop.schedule(() -> { if (channel.isOpen()) { this.channel.close(); + disconnected = true; } - }, 500, TimeUnit.MILLISECONDS); + }, 1, TimeUnit.SECONDS); } } diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java index 1481b354ee..a65c9fb4c7 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java @@ -119,6 +119,36 @@ class MqttClientTest { assertThat(client.isConnected()).isTrue(); } + @Test + void testDisconnectFromBroker() { + // GIVEN + var clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId("Test[ConnectToBroker]"); + clientConfig.setClientId("connect"); + + client = MqttClient.create(clientConfig, null, handlerExecutor); + + // WHEN + Promise connectFuture = client.connect(broker.getHost(), broker.getMqttPort()); + + // THEN + assertThat(connectFuture).isNotNull(); + + Awaitility.await("waiting for client to connect") + .atMost(Duration.ofSeconds(10L)) + .until(connectFuture::isDone); + + assertThat(connectFuture.isSuccess()).isTrue(); + + // WHEN + client.disconnect(); + + // THEN + Awaitility.await("waiting for client to disconnect") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(client.isConnected()).isFalse()); + } + @Test void testDisconnectDueToKeepAliveIfNoActivity() { // GIVEN