Resolved PR comments
This commit is contained in:
parent
e112077cb0
commit
9786e0a2f8
@ -96,6 +96,8 @@ final class MqttClientImpl implements MqttClient {
|
||||
|
||||
private final ListeningExecutor handlerExecutor;
|
||||
|
||||
private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1;
|
||||
|
||||
/**
|
||||
* Construct the MqttClientImpl with default config
|
||||
*/
|
||||
@ -468,13 +470,13 @@ final class MqttClientImpl implements MqttClient {
|
||||
future.channel().close();
|
||||
disconnected = true;
|
||||
});
|
||||
|
||||
eventLoop.schedule(() -> {
|
||||
if (channel.isOpen()) {
|
||||
log.trace("[{}] Channel still open after {} second; forcing close now", channel.id(), DISCONNECT_FALLBACK_DELAY_SECS);
|
||||
this.channel.close();
|
||||
disconnected = true;
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
}, DISCONNECT_FALLBACK_DELAY_SECS, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -123,22 +123,12 @@ class MqttClientTest {
|
||||
void testDisconnectFromBroker() {
|
||||
// GIVEN
|
||||
var clientConfig = new MqttClientConfig();
|
||||
clientConfig.setOwnerId("Test[ConnectToBroker]");
|
||||
clientConfig.setClientId("connect");
|
||||
clientConfig.setOwnerId("Test[Disconnect]");
|
||||
clientConfig.setClientId("disconnect");
|
||||
|
||||
client = MqttClient.create(clientConfig, null, handlerExecutor);
|
||||
|
||||
// WHEN
|
||||
Promise<MqttConnectResult> connectFuture = client.connect(broker.getHost(), broker.getMqttPort());
|
||||
|
||||
// THEN
|
||||
assertThat(connectFuture).isNotNull();
|
||||
|
||||
Awaitility.await("waiting for client to connect")
|
||||
.atMost(Duration.ofSeconds(10L))
|
||||
.until(connectFuture::isDone);
|
||||
|
||||
assertThat(connectFuture.isSuccess()).isTrue();
|
||||
connect(broker.getHost(), broker.getMqttPort());
|
||||
|
||||
// WHEN
|
||||
client.disconnect();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user