diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 5fbdb883af..f30fc6e6ad 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -156,9 +156,11 @@ final class MqttClientImpl implements MqttClient { if (callback != null) { callback.connectionLost(e); } + pendingSubscriptions.forEach((id, mqttPendingSubscription) -> mqttPendingSubscription.onChannelClosed()); pendingSubscriptions.clear(); serverSubscriptions.clear(); subscriptions.clear(); + pendingServerUnsubscribes.forEach((id, mqttPendingServerUnsubscribes) -> mqttPendingServerUnsubscribes.onChannelClosed()); pendingServerUnsubscribes.clear(); qos2PendingIncomingPublishes.clear(); pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed()); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java index f647569553..e19a60226a 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java @@ -24,7 +24,7 @@ import io.netty.util.concurrent.Promise; import java.util.function.Consumer; -final class MqttPendingPublish { +final class MqttPendingPublish{ private final int messageId; private final Promise future; @@ -99,7 +99,7 @@ final class MqttPendingPublish { this.pubrelRetransmissionHandler.stop(); } - void onChannelClosed() { + void onChannelClosed(){ this.publishRetransmissionHandler.stop(); this.pubrelRetransmissionHandler.stop(); } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java index d0d396d784..975a399691 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java @@ -23,7 +23,7 @@ import java.util.HashSet; import java.util.Set; import java.util.function.Consumer; -final class MqttPendingSubscription { +final class MqttPendingSubscription{ private final Promise future; private final String topic; @@ -99,4 +99,8 @@ final class MqttPendingSubscription { return once; } } + + void onChannelClosed(){ + this.retransmissionHandler.stop(); + } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java index ca9d0b6e77..9042aa268a 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java @@ -21,7 +21,7 @@ import io.netty.util.concurrent.Promise; import java.util.function.Consumer; -final class MqttPendingUnsubscription { +final class MqttPendingUnsubscription{ private final Promise future; private final String topic; @@ -52,4 +52,8 @@ final class MqttPendingUnsubscription { void onUnsubackReceived(){ this.retransmissionHandler.stop(); } + + void onChannelClosed(){ + this.retransmissionHandler.stop(); + } }