From 89713558cf62351b13226a0d8dbf4412a5372735 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Sun, 28 Jan 2024 21:02:38 +0100 Subject: [PATCH] MqttClient use Promise extends Future to have full async capabilities provided by Netty (addListener, etc) --- .../src/main/java/org/thingsboard/mqtt/MqttClient.java | 7 ++++--- .../main/java/org/thingsboard/mqtt/MqttClientImpl.java | 8 ++++---- .../thingsboard/mqtt/integration/MqttIntegrationTest.java | 3 ++- .../java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java | 4 ++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java index 536a76119f..93bcee181c 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java @@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; import org.thingsboard.common.util.ListeningExecutor; public interface MqttClient { @@ -32,7 +33,7 @@ public interface MqttClient { * @param host The ip address or host to connect to * @return A future which will be completed when the connection is opened and we received an CONNACK */ - Future connect(String host); + Promise connect(String host); /** * Connect to the specified hostname/ip using the specified port @@ -41,7 +42,7 @@ public interface MqttClient { * @param port The tcp port to connect to * @return A future which will be completed when the connection is opened and we received an CONNACK */ - Future connect(String host, int port); + Promise connect(String host, int port); /** * @@ -55,7 +56,7 @@ public interface MqttClient { * @return A future which will be completed when the connection is opened and we received an CONNACK * @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted */ - Future reconnect(); + Promise reconnect(); /** * Retrieve the netty {@link EventLoopGroup} we are using diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 63d65a1cc2..cf8287e904 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -118,7 +118,7 @@ final class MqttClientImpl implements MqttClient { * @return A future which will be completed when the connection is opened and we received an CONNACK */ @Override - public Future connect(String host) { + public Promise connect(String host) { return connect(host, 1883); } @@ -130,11 +130,11 @@ final class MqttClientImpl implements MqttClient { * @return A future which will be completed when the connection is opened and we received an CONNACK */ @Override - public Future connect(String host, int port) { + public Promise connect(String host, int port) { return connect(host, port, false); } - private Future connect(String host, int port, boolean reconnect) { + private Promise connect(String host, int port, boolean reconnect) { log.trace("[{}] Connecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect); if (this.eventLoop == null) { this.eventLoop = new NioEventLoopGroup(); @@ -199,7 +199,7 @@ final class MqttClientImpl implements MqttClient { } @Override - public Future reconnect() { + public Promise reconnect() { log.trace("[{}] Reconnecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect); if (host == null) { throw new IllegalStateException("Cannot reconnect. Call connect() first"); diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index 04c2be1740..1f6f9c85a6 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -21,6 +21,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Assert; @@ -127,7 +128,7 @@ public class MqttIntegrationTest { config.setReconnectDelay(RECONNECT_DELAY_SECONDS); MqttClient client = MqttClient.create(config, null, handlerExecutor); client.setEventLoop(this.eventLoopGroup); - Future connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); + Promise connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort(); MqttConnectResult result; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index b9ee65675a..159c6a450e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -18,7 +18,7 @@ package org.thingsboard.rule.engine.mqtt; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.ssl.SslContext; -import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; import lombok.extern.slf4j.Slf4j; import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientConfig; @@ -121,7 +121,7 @@ public class TbMqttNode extends TbAbstractExternalNode { prepareMqttClientConfig(config); MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor()); client.setEventLoop(ctx.getSharedEventLoop()); - Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); + Promise connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort()); MqttConnectResult result; try { result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);