Merge pull request #6614 from laziestcoder/develop/3.4

[3.4] fix: typo fix in variable name and method name
This commit is contained in:
Andrew Shvayka 2022-06-02 12:05:00 +03:00 committed by GitHub
commit d0b3d4ad29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 14 deletions

View File

@ -166,7 +166,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
for (MqttPendingSubscription.MqttPendingHandler handler : pendingSubscription.getHandlers()) { for (MqttPendingSubscription.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
MqttSubscription subscription = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce()); MqttSubscription subscription = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscription); this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscription);
this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscription); this.client.getHandlerToSubscription().put(handler.getHandler(), subscription);
} }
this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic()); this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());

View File

@ -55,7 +55,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
/** /**
* Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times * Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times
@ -70,7 +69,7 @@ final class MqttClientImpl implements MqttClient {
private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create(); private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create();
private final ConcurrentMap<Integer, MqttPendingSubscription> pendingSubscriptions = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, MqttPendingSubscription> pendingSubscriptions = new ConcurrentHashMap<>();
private final Set<String> pendingSubscribeTopics = new HashSet<>(); private final Set<String> pendingSubscribeTopics = new HashSet<>();
private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create(); private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscription = HashMultimap.create();
private final AtomicInteger nextMessageId = new AtomicInteger(1); private final AtomicInteger nextMessageId = new AtomicInteger(1);
private final MqttClientConfig clientConfig; private final MqttClientConfig clientConfig;
@ -166,7 +165,7 @@ final class MqttClientImpl implements MqttClient {
pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed()); pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed());
pendingPublishes.clear(); pendingPublishes.clear();
pendingSubscribeTopics.clear(); pendingSubscribeTopics.clear();
handlerToSubscribtion.clear(); handlerToSubscription.clear();
scheduleConnectIfRequired(host, port, true); scheduleConnectIfRequired(host, port, true);
}); });
} else { } else {
@ -283,11 +282,11 @@ final class MqttClientImpl implements MqttClient {
@Override @Override
public Future<Void> off(String topic, MqttHandler handler) { public Future<Void> off(String topic, MqttHandler handler) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
for (MqttSubscription subscription : this.handlerToSubscribtion.get(handler)) { for (MqttSubscription subscription : this.handlerToSubscription.get(handler)) {
this.subscriptions.remove(topic, subscription); this.subscriptions.remove(topic, subscription);
} }
this.handlerToSubscribtion.removeAll(handler); this.handlerToSubscription.removeAll(handler);
this.checkSubscribtions(topic, future); this.checkSubscriptions(topic, future);
return future; return future;
} }
@ -303,12 +302,12 @@ final class MqttClientImpl implements MqttClient {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic)); ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic));
for (MqttSubscription subscription : subscriptions) { for (MqttSubscription subscription : subscriptions) {
for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscription.getHandler())) { for (MqttSubscription handSub : this.handlerToSubscription.get(subscription.getHandler())) {
this.subscriptions.remove(topic, handSub); this.subscriptions.remove(topic, handSub);
} }
this.handlerToSubscribtion.remove(subscription.getHandler(), subscription); this.handlerToSubscription.remove(subscription.getHandler(), subscription);
} }
this.checkSubscribtions(topic, future); this.checkSubscriptions(topic, future);
return future; return future;
} }
@ -461,7 +460,7 @@ final class MqttClientImpl implements MqttClient {
if (this.serverSubscriptions.contains(topic)) { if (this.serverSubscriptions.contains(topic)) {
MqttSubscription subscription = new MqttSubscription(topic, handler, once); MqttSubscription subscription = new MqttSubscription(topic, handler, once);
this.subscriptions.put(topic, subscription); this.subscriptions.put(topic, subscription);
this.handlerToSubscribtion.put(handler, subscription); this.handlerToSubscription.put(handler, subscription);
return this.channel.newSucceededFuture(); return this.channel.newSucceededFuture();
} }
@ -484,7 +483,7 @@ final class MqttClientImpl implements MqttClient {
return future; return future;
} }
private void checkSubscribtions(String topic, Promise<Void> promise) { private void checkSubscriptions(String topic, Promise<Void> promise) {
if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscriptions.contains(topic)) { if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscriptions.contains(topic)) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = getNewMessageId(); MqttMessageIdVariableHeader variableHeader = getNewMessageId();
@ -514,8 +513,8 @@ final class MqttClientImpl implements MqttClient {
return pendingSubscribeTopics; return pendingSubscribeTopics;
} }
HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscribtion() { HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscription() {
return handlerToSubscribtion; return handlerToSubscription;
} }
Set<String> getServerSubscriptions() { Set<String> getServerSubscriptions() {