diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index aef2cad684..c493f2b860 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -340,6 +340,7 @@ final class MqttClientImpl implements MqttClient { MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos); + this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish); ChannelFuture channelFuture = this.sendAndFlushPacket(message); if (channelFuture != null) { @@ -350,9 +351,9 @@ final class MqttClientImpl implements MqttClient { } } if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) { + this.pendingPublishes.remove(pendingPublish.getMessageId()); pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 } else if (pendingPublish.isSent()) { - this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish); pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); } return future;