MessageId to PacketId refactoring
This commit is contained in:
parent
f05f5947bf
commit
f41cd97cbb
@ -185,21 +185,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
|
|||||||
|
|
||||||
case AT_LEAST_ONCE:
|
case AT_LEAST_ONCE:
|
||||||
invokeHandlersForIncomingPublish(message);
|
invokeHandlersForIncomingPublish(message);
|
||||||
if (message.variableHeader().messageId() != -1) {
|
if (message.variableHeader().packetId() != -1) {
|
||||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId());
|
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
|
||||||
channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
|
channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case EXACTLY_ONCE:
|
case EXACTLY_ONCE:
|
||||||
if (message.variableHeader().messageId() != -1) {
|
if (message.variableHeader().packetId() != -1) {
|
||||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId());
|
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
|
||||||
MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader);
|
MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader);
|
||||||
|
|
||||||
MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage);
|
MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage);
|
||||||
this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().messageId(), incomingQos2Publish);
|
this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().packetId(), incomingQos2Publish);
|
||||||
message.payload().retain();
|
message.payload().retain();
|
||||||
incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
|
incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
|
||||||
|
|
||||||
@ -249,7 +249,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
|
|||||||
MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
|
MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
|
||||||
this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
|
this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
|
||||||
incomingQos2Publish.onPubrelReceived();
|
incomingQos2Publish.onPubrelReceived();
|
||||||
this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().messageId());
|
this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
|
||||||
}
|
}
|
||||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
|
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
|
||||||
|
|||||||
@ -339,7 +339,7 @@ final class MqttClientImpl implements MqttClient {
|
|||||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
|
||||||
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
|
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
|
||||||
MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
|
MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
|
||||||
MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.messageId(), future, payload.retain(), message, qos);
|
MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos);
|
||||||
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
|
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
|
||||||
|
|
||||||
if (channelFuture != null) {
|
if (channelFuture != null) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user