From 0468cf8cf5c6f0e9c21d1e9ff1e0b59662c612c0 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 8 Nov 2021 17:20:38 +0200 Subject: [PATCH] Fix duplication of MQTT packets in MQTT Client --- .../org/thingsboard/mqtt/MqttClientImpl.java | 9 ++++--- .../thingsboard/mqtt/MqttPendingPublish.java | 15 +++++++---- .../mqtt/MqttPendingSubscription.java | 15 +++++------ .../mqtt/MqttPendingUnsubscription.java | 5 ++-- .../thingsboard/mqtt/MqttSubscription.java | 6 ++--- .../thingsboard/mqtt/PendingOperation.java | 22 ++++++++++++++++ .../mqtt/RetransmissionHandler.java | 25 +++++++++++++------ 7 files changed, 70 insertions(+), 27 deletions(-) create mode 100644 netty-mqtt/src/main/java/org/thingsboard/mqtt/PendingOperation.java diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index f30fc6e6ad..80a336cbb2 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -365,7 +365,8 @@ final class MqttClientImpl implements MqttClient { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0); MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); - MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos); + MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, + payload.retain(), message, qos, () -> !pendingPublishes.containsKey(variableHeader.packetId())); this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish); ChannelFuture channelFuture = this.sendAndFlushPacket(message); @@ -471,7 +472,8 @@ final class MqttClientImpl implements MqttClient { MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription)); MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload); - final MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(future, topic, message); + final MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(future, topic, message, + () -> !pendingSubscriptions.containsKey(variableHeader.messageId())); pendingSubscription.addHandler(handler, once); this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscription); this.pendingSubscribeTopics.add(topic); @@ -489,7 +491,8 @@ final class MqttClientImpl implements MqttClient { MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic)); MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload); - MqttPendingUnsubscription pendingUnsubscription = new MqttPendingUnsubscription(promise, topic, message); + MqttPendingUnsubscription pendingUnsubscription = new MqttPendingUnsubscription(promise, topic, message, + () -> !pendingServerUnsubscribes.containsKey(variableHeader.messageId())); this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscription); pendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java index e19a60226a..8369d8f65e 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java @@ -24,7 +24,7 @@ import io.netty.util.concurrent.Promise; import java.util.function.Consumer; -final class MqttPendingPublish{ +final class MqttPendingPublish { private final int messageId; private final Promise future; @@ -32,19 +32,21 @@ final class MqttPendingPublish{ private final MqttPublishMessage message; private final MqttQoS qos; - private final RetransmissionHandler publishRetransmissionHandler = new RetransmissionHandler<>(); - private final RetransmissionHandler pubrelRetransmissionHandler = new RetransmissionHandler<>(); + private final RetransmissionHandler publishRetransmissionHandler; + private final RetransmissionHandler pubrelRetransmissionHandler; private boolean sent = false; - MqttPendingPublish(int messageId, Promise future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) { + MqttPendingPublish(int messageId, Promise future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos, PendingOperation operation) { this.messageId = messageId; this.future = future; this.payload = payload; this.message = message; this.qos = qos; + this.publishRetransmissionHandler = new RetransmissionHandler<>(operation); this.publishRetransmissionHandler.setOriginalMessage(message); + this.pubrelRetransmissionHandler = new RetransmissionHandler<>(operation); } int getMessageId() { @@ -99,8 +101,11 @@ final class MqttPendingPublish{ this.pubrelRetransmissionHandler.stop(); } - void onChannelClosed(){ + void onChannelClosed() { this.publishRetransmissionHandler.stop(); this.pubrelRetransmissionHandler.stop(); + if (payload != null) { + payload.release(); + } } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java index 975a399691..17bfa1139d 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java @@ -23,22 +23,23 @@ import java.util.HashSet; import java.util.Set; import java.util.function.Consumer; -final class MqttPendingSubscription{ +final class MqttPendingSubscription { private final Promise future; private final String topic; private final Set handlers = new HashSet<>(); private final MqttSubscribeMessage subscribeMessage; - private final RetransmissionHandler retransmissionHandler = new RetransmissionHandler<>(); + private final RetransmissionHandler retransmissionHandler; private boolean sent = false; - MqttPendingSubscription(Promise future, String topic, MqttSubscribeMessage message) { + MqttPendingSubscription(Promise future, String topic, MqttSubscribeMessage message, PendingOperation operation) { this.future = future; this.topic = topic; this.subscribeMessage = message; + this.retransmissionHandler = new RetransmissionHandler<>(operation); this.retransmissionHandler.setOriginalMessage(message); } @@ -62,7 +63,7 @@ final class MqttPendingSubscription{ return subscribeMessage; } - void addHandler(MqttHandler handler, boolean once){ + void addHandler(MqttHandler handler, boolean once) { this.handlers.add(new MqttPendingHandler(handler, once)); } @@ -71,14 +72,14 @@ final class MqttPendingSubscription{ } void startRetransmitTimer(EventLoop eventLoop, Consumer sendPacket) { - if(this.sent){ //If the packet is sent, we can start the retransmit timer + if (this.sent) { //If the packet is sent, we can start the retransmit timer this.retransmissionHandler.setHandle((fixedHeader, originalMessage) -> sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); this.retransmissionHandler.start(eventLoop); } } - void onSubackReceived(){ + void onSubackReceived() { this.retransmissionHandler.stop(); } @@ -100,7 +101,7 @@ final class MqttPendingSubscription{ } } - void onChannelClosed(){ + void onChannelClosed() { this.retransmissionHandler.stop(); } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java index 9042aa268a..42d2b852b9 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java @@ -26,12 +26,13 @@ final class MqttPendingUnsubscription{ private final Promise future; private final String topic; - private final RetransmissionHandler retransmissionHandler = new RetransmissionHandler<>(); + private final RetransmissionHandler retransmissionHandler; - MqttPendingUnsubscription(Promise future, String topic, MqttUnsubscribeMessage unsubscribeMessage) { + MqttPendingUnsubscription(Promise future, String topic, MqttUnsubscribeMessage unsubscribeMessage, PendingOperation operation) { this.future = future; this.topic = topic; + this.retransmissionHandler = new RetransmissionHandler<>(operation); this.retransmissionHandler.setOriginalMessage(unsubscribeMessage); } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java index 9f06ff84e4..62b5e059f4 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java @@ -28,10 +28,10 @@ final class MqttSubscription { private boolean called; MqttSubscription(String topic, MqttHandler handler, boolean once) { - if(topic == null){ + if (topic == null) { throw new NullPointerException("topic"); } - if(handler == null){ + if (handler == null) { throw new NullPointerException("handler"); } this.topic = topic; @@ -56,7 +56,7 @@ final class MqttSubscription { return called; } - boolean matches(String topic){ + boolean matches(String topic) { return this.topicRegex.matcher(topic).matches(); } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/PendingOperation.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/PendingOperation.java new file mode 100644 index 0000000000..c9ab7c5f5e --- /dev/null +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/PendingOperation.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.mqtt; + +public interface PendingOperation { + + boolean isCanceled(); + +} diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java index 531d3bf19a..5d3f9b39d6 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java @@ -21,33 +21,43 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.ScheduledFuture; +import lombok.RequiredArgsConstructor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +@RequiredArgsConstructor final class RetransmissionHandler { + private volatile boolean stopped; + private final PendingOperation pendingOperation; private ScheduledFuture timer; private int timeout = 10; private BiConsumer handler; private T originalMessage; - void start(EventLoop eventLoop){ - if(eventLoop == null){ + void start(EventLoop eventLoop) { + if (eventLoop == null) { throw new NullPointerException("eventLoop"); } - if(this.handler == null){ + if (this.handler == null) { throw new NullPointerException("handler"); } this.timeout = 10; this.startTimer(eventLoop); } - private void startTimer(EventLoop eventLoop){ + private void startTimer(EventLoop eventLoop) { + if (stopped || pendingOperation.isCanceled()) { + return; + } this.timer = eventLoop.schedule(() -> { + if (stopped || pendingOperation.isCanceled()) { + return; + } this.timeout += 5; boolean isDup = this.originalMessage.fixedHeader().isDup(); - if(this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH && this.originalMessage.fixedHeader().qosLevel() != MqttQoS.AT_MOST_ONCE){ + if (this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH && this.originalMessage.fixedHeader().qosLevel() != MqttQoS.AT_MOST_ONCE) { isDup = true; } MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), isDup, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength()); @@ -56,8 +66,9 @@ final class RetransmissionHandler { }, timeout, TimeUnit.SECONDS); } - void stop(){ - if(this.timer != null){ + void stop() { + stopped = true; + if (this.timer != null) { this.timer.cancel(true); } }