MqttClient use Promise<V> extends Future<V> to have full async capabilities provided by Netty (addListener, etc)

This commit is contained in:
Sergey Matvienko 2024-01-28 21:02:38 +01:00
parent a8866ba387
commit 89713558cf
4 changed files with 12 additions and 10 deletions

View File

@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.common.util.ListeningExecutor;
public interface MqttClient { public interface MqttClient {
@ -32,7 +33,7 @@ public interface MqttClient {
* @param host The ip address or host to connect to * @param host The ip address or host to connect to
* @return A future which will be completed when the connection is opened and we received an CONNACK * @return A future which will be completed when the connection is opened and we received an CONNACK
*/ */
Future<MqttConnectResult> connect(String host); Promise<MqttConnectResult> connect(String host);
/** /**
* Connect to the specified hostname/ip using the specified port * Connect to the specified hostname/ip using the specified port
@ -41,7 +42,7 @@ public interface MqttClient {
* @param port The tcp port to connect to * @param port The tcp port to connect to
* @return A future which will be completed when the connection is opened and we received an CONNACK * @return A future which will be completed when the connection is opened and we received an CONNACK
*/ */
Future<MqttConnectResult> connect(String host, int port); Promise<MqttConnectResult> connect(String host, int port);
/** /**
* *
@ -55,7 +56,7 @@ public interface MqttClient {
* @return A future which will be completed when the connection is opened and we received an CONNACK * @return A future which will be completed when the connection is opened and we received an CONNACK
* @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted * @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted
*/ */
Future<MqttConnectResult> reconnect(); Promise<MqttConnectResult> reconnect();
/** /**
* Retrieve the netty {@link EventLoopGroup} we are using * Retrieve the netty {@link EventLoopGroup} we are using

View File

@ -118,7 +118,7 @@ final class MqttClientImpl implements MqttClient {
* @return A future which will be completed when the connection is opened and we received an CONNACK * @return A future which will be completed when the connection is opened and we received an CONNACK
*/ */
@Override @Override
public Future<MqttConnectResult> connect(String host) { public Promise<MqttConnectResult> connect(String host) {
return connect(host, 1883); return connect(host, 1883);
} }
@ -130,11 +130,11 @@ final class MqttClientImpl implements MqttClient {
* @return A future which will be completed when the connection is opened and we received an CONNACK * @return A future which will be completed when the connection is opened and we received an CONNACK
*/ */
@Override @Override
public Future<MqttConnectResult> connect(String host, int port) { public Promise<MqttConnectResult> connect(String host, int port) {
return connect(host, port, false); return connect(host, port, false);
} }
private Future<MqttConnectResult> connect(String host, int port, boolean reconnect) { private Promise<MqttConnectResult> connect(String host, int port, boolean reconnect) {
log.trace("[{}] Connecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect); log.trace("[{}] Connecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
if (this.eventLoop == null) { if (this.eventLoop == null) {
this.eventLoop = new NioEventLoopGroup(); this.eventLoop = new NioEventLoopGroup();
@ -199,7 +199,7 @@ final class MqttClientImpl implements MqttClient {
} }
@Override @Override
public Future<MqttConnectResult> reconnect() { public Promise<MqttConnectResult> reconnect() {
log.trace("[{}] Reconnecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect); log.trace("[{}] Reconnecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
if (host == null) { if (host == null) {
throw new IllegalStateException("Cannot reconnect. Call connect() first"); throw new IllegalStateException("Cannot reconnect. Call connect() first");

View File

@ -21,6 +21,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -127,7 +128,7 @@ public class MqttIntegrationTest {
config.setReconnectDelay(RECONNECT_DELAY_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
MqttClient client = MqttClient.create(config, null, handlerExecutor); MqttClient client = MqttClient.create(config, null, handlerExecutor);
client.setEventLoop(this.eventLoopGroup); client.setEventLoop(this.eventLoopGroup);
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); Promise<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort(); String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort();
MqttConnectResult result; MqttConnectResult result;

View File

@ -18,7 +18,7 @@ package org.thingsboard.rule.engine.mqtt;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttClientConfig;
@ -121,7 +121,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
prepareMqttClientConfig(config); prepareMqttClientConfig(config);
MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor());
client.setEventLoop(ctx.getSharedEventLoop()); client.setEventLoop(ctx.getSharedEventLoop());
Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); Promise<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
MqttConnectResult result; MqttConnectResult result;
try { try {
result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS); result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);