diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 1b1ed9bf0f..eb30ef8b9c 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -556,22 +556,6 @@ public class MqttClientTest extends AbstractContainerTest { assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND"); } - @Test - public void regularDisconnect() throws Exception { - DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); - - MqttMessageListener listener = new MqttMessageListener(); - MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5); - final List returnCodeByteValue = new ArrayList<>(); - MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue); - mqttClient.setCallback(callbackForDisconnectWithReturnCode); - mqttClient.disconnect(); - Thread.sleep(1000); - assertThat(returnCodeByteValue.size()).isEqualTo(1); - MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0)); - assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.NORMAL_DISCONNECT); - } - @Test public void clientSessionTakenOverDisconnect() throws Exception { DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 543553951d..801470284b 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -96,6 +96,8 @@ final class MqttClientImpl implements MqttClient { private final ListeningExecutor handlerExecutor; + private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1; + /** * Construct the MqttClientImpl with default config */ @@ -456,16 +458,25 @@ final class MqttClientImpl implements MqttClient { @Override public void disconnect() { + if (disconnected) { + return; + } + log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN"); - disconnected = true; if (this.channel != null) { MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)); - ChannelFuture channelFuture = this.sendAndFlushPacket(message); + + sendAndFlushPacket(message).addListener((ChannelFutureListener) future -> { + future.channel().close(); + disconnected = true; + }); eventLoop.schedule(() -> { - if (!channelFuture.isDone()) { + if (channel.isOpen()) { + log.trace("[{}] Channel still open after {} second; forcing close now", channel.id(), DISCONNECT_FALLBACK_DELAY_SECS); this.channel.close(); + disconnected = true; } - }, 500, TimeUnit.MILLISECONDS); + }, DISCONNECT_FALLBACK_DELAY_SECS, TimeUnit.SECONDS); } } diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java index 1481b354ee..60e625aa8d 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/MqttClientTest.java @@ -119,6 +119,26 @@ class MqttClientTest { assertThat(client.isConnected()).isTrue(); } + @Test + void testDisconnectFromBroker() { + // GIVEN + var clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId("Test[Disconnect]"); + clientConfig.setClientId("disconnect"); + + client = MqttClient.create(clientConfig, null, handlerExecutor); + + connect(broker.getHost(), broker.getMqttPort()); + + // WHEN + client.disconnect(); + + // THEN + Awaitility.await("waiting for client to disconnect") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(client.isConnected()).isFalse()); + } + @Test void testDisconnectDueToKeepAliveIfNoActivity() { // GIVEN