Merge pull request #13464 from artem-barysh-dev/fix-mqtt-disconnect

Improve MQTT client disconnect
This commit is contained in:
Viacheslav Klimov 2025-06-04 13:23:30 +03:00 committed by GitHub
commit 79e638ed54
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 35 additions and 20 deletions

View File

@ -556,22 +556,6 @@ public class MqttClientTest extends AbstractContainerTest {
assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND");
}
@Test
public void regularDisconnect() throws Exception {
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5);
final List<Byte> returnCodeByteValue = new ArrayList<>();
MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue);
mqttClient.setCallback(callbackForDisconnectWithReturnCode);
mqttClient.disconnect();
Thread.sleep(1000);
assertThat(returnCodeByteValue.size()).isEqualTo(1);
MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0));
assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.NORMAL_DISCONNECT);
}
@Test
public void clientSessionTakenOverDisconnect() throws Exception {
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());

View File

@ -96,6 +96,8 @@ final class MqttClientImpl implements MqttClient {
private final ListeningExecutor handlerExecutor;
private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1;
/**
* Construct the MqttClientImpl with default config
*/
@ -456,16 +458,25 @@ final class MqttClientImpl implements MqttClient {
@Override
public void disconnect() {
if (disconnected) {
return;
}
log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN");
disconnected = true;
if (this.channel != null) {
MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
sendAndFlushPacket(message).addListener((ChannelFutureListener) future -> {
future.channel().close();
disconnected = true;
});
eventLoop.schedule(() -> {
if (!channelFuture.isDone()) {
if (channel.isOpen()) {
log.trace("[{}] Channel still open after {} second; forcing close now", channel.id(), DISCONNECT_FALLBACK_DELAY_SECS);
this.channel.close();
disconnected = true;
}
}, 500, TimeUnit.MILLISECONDS);
}, DISCONNECT_FALLBACK_DELAY_SECS, TimeUnit.SECONDS);
}
}

View File

@ -119,6 +119,26 @@ class MqttClientTest {
assertThat(client.isConnected()).isTrue();
}
@Test
void testDisconnectFromBroker() {
// GIVEN
var clientConfig = new MqttClientConfig();
clientConfig.setOwnerId("Test[Disconnect]");
clientConfig.setClientId("disconnect");
client = MqttClient.create(clientConfig, null, handlerExecutor);
connect(broker.getHost(), broker.getMqttPort());
// WHEN
client.disconnect();
// THEN
Awaitility.await("waiting for client to disconnect")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertThat(client.isConnected()).isFalse());
}
@Test
void testDisconnectDueToKeepAliveIfNoActivity() {
// GIVEN