diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index c493f2b860..f72b8d0f4e 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -19,18 +19,39 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttSubscribePayload; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; +import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; import io.netty.handler.ssl.SslContext; import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.collection.IntObjectHashMap; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import java.util.*; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -41,11 +62,11 @@ import java.util.concurrent.atomic.AtomicInteger; final class MqttClientImpl implements MqttClient { private final Set serverSubscriptions = new HashSet<>(); - private final IntObjectHashMap pendingServerUnsubscribes = new IntObjectHashMap<>(); - private final IntObjectHashMap qos2PendingIncomingPublishes = new IntObjectHashMap<>(); - private final IntObjectHashMap pendingPublishes = new IntObjectHashMap<>(); + private final ConcurrentHashMap pendingServerUnsubscribes = new ConcurrentHashMap<>(); + private final ConcurrentHashMap qos2PendingIncomingPublishes = new ConcurrentHashMap<>(); + private final ConcurrentHashMap pendingPublishes = new ConcurrentHashMap<>(); private final HashMultimap subscriptions = HashMultimap.create(); - private final IntObjectHashMap pendingSubscriptions = new IntObjectHashMap<>(); + private final ConcurrentHashMap pendingSubscriptions = new ConcurrentHashMap<>(); private final Set pendingSubscribeTopics = new HashSet<>(); private final HashMultimap handlerToSubscribtion = HashMultimap.create(); private final AtomicInteger nextMessageId = new AtomicInteger(1); @@ -355,6 +376,8 @@ final class MqttClientImpl implements MqttClient { pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 } else if (pendingPublish.isSent()) { pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); + } else { + this.pendingPublishes.remove(pendingPublish.getMessageId()); } return future; } @@ -466,7 +489,7 @@ final class MqttClientImpl implements MqttClient { } } - IntObjectHashMap getPendingSubscriptions() { + ConcurrentHashMap getPendingSubscriptions() { return pendingSubscriptions; } @@ -486,15 +509,15 @@ final class MqttClientImpl implements MqttClient { return serverSubscriptions; } - IntObjectHashMap getPendingServerUnsubscribes() { + ConcurrentHashMap getPendingServerUnsubscribes() { return pendingServerUnsubscribes; } - IntObjectHashMap getPendingPublishes() { + ConcurrentHashMap getPendingPublishes() { return pendingPublishes; } - IntObjectHashMap getQos2PendingIncomingPublishes() { + ConcurrentHashMap getQos2PendingIncomingPublishes() { return qos2PendingIncomingPublishes; }