This commit is contained in:
Artem Barysh 2025-05-29 13:52:08 +03:00
parent 0bbec75e75
commit e112077cb0
2 changed files with 42 additions and 3 deletions

View File

@ -456,16 +456,25 @@ final class MqttClientImpl implements MqttClient {
@Override
public void disconnect() {
if (disconnected) {
return;
}
log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN");
disconnected = true;
if (this.channel != null) {
MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
sendAndFlushPacket(message).addListener((ChannelFutureListener) future -> {
future.channel().close();
disconnected = true;
});
eventLoop.schedule(() -> {
if (channel.isOpen()) {
this.channel.close();
disconnected = true;
}
}, 500, TimeUnit.MILLISECONDS);
}, 1, TimeUnit.SECONDS);
}
}

View File

@ -119,6 +119,36 @@ class MqttClientTest {
assertThat(client.isConnected()).isTrue();
}
@Test
void testDisconnectFromBroker() {
// GIVEN
var clientConfig = new MqttClientConfig();
clientConfig.setOwnerId("Test[ConnectToBroker]");
clientConfig.setClientId("connect");
client = MqttClient.create(clientConfig, null, handlerExecutor);
// WHEN
Promise<MqttConnectResult> connectFuture = client.connect(broker.getHost(), broker.getMqttPort());
// THEN
assertThat(connectFuture).isNotNull();
Awaitility.await("waiting for client to connect")
.atMost(Duration.ofSeconds(10L))
.until(connectFuture::isDone);
assertThat(connectFuture.isSuccess()).isTrue();
// WHEN
client.disconnect();
// THEN
Awaitility.await("waiting for client to disconnect")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertThat(client.isConnected()).isFalse());
}
@Test
void testDisconnectDueToKeepAliveIfNoActivity() {
// GIVEN