Removed resending of PUBREC msgs

This commit is contained in:
vzikratyi 2020-08-31 12:40:49 +03:00 committed by Andrew Shvayka
parent 4df660fe03
commit 8e8c1ae860
3 changed files with 4 additions and 23 deletions

View File

@ -515,7 +515,7 @@ js:
# Specify thread pool size for JavaScript sandbox resource monitor # Specify thread pool size for JavaScript sandbox resource monitor
monitor_thread_pool_size: "${LOCAL_JS_SANDBOX_MONITOR_THREAD_POOL_SIZE:4}" monitor_thread_pool_size: "${LOCAL_JS_SANDBOX_MONITOR_THREAD_POOL_SIZE:4}"
# Maximum CPU time in milliseconds allowed for script execution # Maximum CPU time in milliseconds allowed for script execution
max_cpu_time: "${LOCAL_JS_SANDBOX_MAX_CPU_TIME:10000}" max_cpu_time: "${LOCAL_JS_SANDBOX_MAX_CPU_TIME:8000}"
# Maximum allowed JavaScript execution errors before JavaScript will be blacklisted # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
max_errors: "${LOCAL_JS_SANDBOX_MAX_ERRORS:3}" max_errors: "${LOCAL_JS_SANDBOX_MAX_ERRORS:3}"
# JS Eval max request timeout. 0 - no timeout # JS Eval max request timeout. 0 - no timeout

View File

@ -198,10 +198,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId()); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader); MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader);
MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage); MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message);
this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().packetId(), incomingQos2Publish); this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().packetId(), incomingQos2Publish);
message.payload().retain(); message.payload().retain();
incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
channel.writeAndFlush(pubrecMessage); channel.writeAndFlush(pubrecMessage);
} }
@ -248,7 +247,6 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) { if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) {
MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
incomingQos2Publish.onPubrelReceived();
this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
} }
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);

View File

@ -15,34 +15,17 @@
*/ */
package org.thingsboard.mqtt; package org.thingsboard.mqtt;
import io.netty.channel.EventLoop; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.*;
import java.util.function.Consumer;
final class MqttIncomingQos2Publish { final class MqttIncomingQos2Publish {
private final MqttPublishMessage incomingPublish; private final MqttPublishMessage incomingPublish;
private final RetransmissionHandler<MqttMessage> retransmissionHandler = new RetransmissionHandler<>(); MqttIncomingQos2Publish(MqttPublishMessage incomingPublish) {
MqttIncomingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) {
this.incomingPublish = incomingPublish; this.incomingPublish = incomingPublish;
this.retransmissionHandler.setOriginalMessage(originalMessage);
} }
MqttPublishMessage getIncomingPublish() { MqttPublishMessage getIncomingPublish() {
return incomingPublish; return incomingPublish;
} }
void startPubrecRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
this.retransmissionHandler.start(eventLoop);
}
void onPubrelReceived() {
this.retransmissionHandler.stop();
}
} }