Merge pull request #10078 from smatvienko-tb/feature/mqtt-client-promise
[3.6.3] MQTT client Promise instead Future to have full async capabilities
This commit is contained in:
commit
9e6b45ae3e
@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import org.thingsboard.common.util.ListeningExecutor;
|
||||
|
||||
public interface MqttClient {
|
||||
@ -32,7 +33,7 @@ public interface MqttClient {
|
||||
* @param host The ip address or host to connect to
|
||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||
*/
|
||||
Future<MqttConnectResult> connect(String host);
|
||||
Promise<MqttConnectResult> connect(String host);
|
||||
|
||||
/**
|
||||
* Connect to the specified hostname/ip using the specified port
|
||||
@ -41,7 +42,7 @@ public interface MqttClient {
|
||||
* @param port The tcp port to connect to
|
||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||
*/
|
||||
Future<MqttConnectResult> connect(String host, int port);
|
||||
Promise<MqttConnectResult> connect(String host, int port);
|
||||
|
||||
/**
|
||||
*
|
||||
@ -55,7 +56,7 @@ public interface MqttClient {
|
||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||
* @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted
|
||||
*/
|
||||
Future<MqttConnectResult> reconnect();
|
||||
Promise<MqttConnectResult> reconnect();
|
||||
|
||||
/**
|
||||
* Retrieve the netty {@link EventLoopGroup} we are using
|
||||
|
||||
@ -118,7 +118,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||
*/
|
||||
@Override
|
||||
public Future<MqttConnectResult> connect(String host) {
|
||||
public Promise<MqttConnectResult> connect(String host) {
|
||||
return connect(host, 1883);
|
||||
}
|
||||
|
||||
@ -130,11 +130,11 @@ final class MqttClientImpl implements MqttClient {
|
||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||
*/
|
||||
@Override
|
||||
public Future<MqttConnectResult> connect(String host, int port) {
|
||||
public Promise<MqttConnectResult> connect(String host, int port) {
|
||||
return connect(host, port, false);
|
||||
}
|
||||
|
||||
private Future<MqttConnectResult> connect(String host, int port, boolean reconnect) {
|
||||
private Promise<MqttConnectResult> connect(String host, int port, boolean reconnect) {
|
||||
log.trace("[{}] Connecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
||||
if (this.eventLoop == null) {
|
||||
this.eventLoop = new NioEventLoopGroup();
|
||||
@ -199,7 +199,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<MqttConnectResult> reconnect() {
|
||||
public Promise<MqttConnectResult> reconnect() {
|
||||
log.trace("[{}] Reconnecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
||||
if (host == null) {
|
||||
throw new IllegalStateException("Cannot reconnect. Call connect() first");
|
||||
|
||||
@ -21,6 +21,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
@ -127,7 +128,7 @@ public class MqttIntegrationTest {
|
||||
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
|
||||
MqttClient client = MqttClient.create(config, null, handlerExecutor);
|
||||
client.setEventLoop(this.eventLoopGroup);
|
||||
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
|
||||
Promise<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
|
||||
|
||||
String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort();
|
||||
MqttConnectResult result;
|
||||
|
||||
@ -18,7 +18,7 @@ package org.thingsboard.rule.engine.mqtt;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.mqtt.MqttClient;
|
||||
import org.thingsboard.mqtt.MqttClientConfig;
|
||||
@ -121,7 +121,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
|
||||
prepareMqttClientConfig(config);
|
||||
MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor());
|
||||
client.setEventLoop(ctx.getSharedEventLoop());
|
||||
Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
|
||||
Promise<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
|
||||
MqttConnectResult result;
|
||||
try {
|
||||
result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user