diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index 1525510814..63e1ac3e63 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -185,21 +185,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler case AT_LEAST_ONCE: invokeHandlersForIncomingPublish(message); - if (message.variableHeader().messageId() != -1) { + if (message.variableHeader().packetId() != -1) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId()); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); } break; case EXACTLY_ONCE: - if (message.variableHeader().messageId() != -1) { + if (message.variableHeader().packetId() != -1) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId()); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader); MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage); - this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().messageId(), incomingQos2Publish); + this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().packetId(), incomingQos2Publish); message.payload().retain(); incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket); @@ -249,7 +249,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); incomingQos2Publish.onPubrelReceived(); - this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().messageId()); + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); } MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index da497e2d70..85b3abeeee 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -339,7 +339,7 @@ final class MqttClientImpl implements MqttClient { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0); MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); - MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.messageId(), future, payload.retain(), message, qos); + MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos); ChannelFuture channelFuture = this.sendAndFlushPacket(message); if (channelFuture != null) {