Merge branch 'hotfix/3.6.2'
This commit is contained in:
commit
f0a9948cda
@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
|
|||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
|
import io.netty.util.concurrent.Promise;
|
||||||
import org.thingsboard.common.util.ListeningExecutor;
|
import org.thingsboard.common.util.ListeningExecutor;
|
||||||
|
|
||||||
public interface MqttClient {
|
public interface MqttClient {
|
||||||
@ -32,7 +33,7 @@ public interface MqttClient {
|
|||||||
* @param host The ip address or host to connect to
|
* @param host The ip address or host to connect to
|
||||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||||
*/
|
*/
|
||||||
Future<MqttConnectResult> connect(String host);
|
Promise<MqttConnectResult> connect(String host);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to the specified hostname/ip using the specified port
|
* Connect to the specified hostname/ip using the specified port
|
||||||
@ -41,7 +42,7 @@ public interface MqttClient {
|
|||||||
* @param port The tcp port to connect to
|
* @param port The tcp port to connect to
|
||||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||||
*/
|
*/
|
||||||
Future<MqttConnectResult> connect(String host, int port);
|
Promise<MqttConnectResult> connect(String host, int port);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -55,7 +56,7 @@ public interface MqttClient {
|
|||||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||||
* @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted
|
* @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted
|
||||||
*/
|
*/
|
||||||
Future<MqttConnectResult> reconnect();
|
Promise<MqttConnectResult> reconnect();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the netty {@link EventLoopGroup} we are using
|
* Retrieve the netty {@link EventLoopGroup} we are using
|
||||||
|
|||||||
@ -118,7 +118,7 @@ final class MqttClientImpl implements MqttClient {
|
|||||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Future<MqttConnectResult> connect(String host) {
|
public Promise<MqttConnectResult> connect(String host) {
|
||||||
return connect(host, 1883);
|
return connect(host, 1883);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,11 +130,11 @@ final class MqttClientImpl implements MqttClient {
|
|||||||
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
* @return A future which will be completed when the connection is opened and we received an CONNACK
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Future<MqttConnectResult> connect(String host, int port) {
|
public Promise<MqttConnectResult> connect(String host, int port) {
|
||||||
return connect(host, port, false);
|
return connect(host, port, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Future<MqttConnectResult> connect(String host, int port, boolean reconnect) {
|
private Promise<MqttConnectResult> connect(String host, int port, boolean reconnect) {
|
||||||
log.trace("[{}] Connecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
log.trace("[{}] Connecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
||||||
if (this.eventLoop == null) {
|
if (this.eventLoop == null) {
|
||||||
this.eventLoop = new NioEventLoopGroup();
|
this.eventLoop = new NioEventLoopGroup();
|
||||||
@ -199,7 +199,7 @@ final class MqttClientImpl implements MqttClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Future<MqttConnectResult> reconnect() {
|
public Promise<MqttConnectResult> reconnect() {
|
||||||
log.trace("[{}] Reconnecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
log.trace("[{}] Reconnecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
||||||
if (host == null) {
|
if (host == null) {
|
||||||
throw new IllegalStateException("Cannot reconnect. Call connect() first");
|
throw new IllegalStateException("Cannot reconnect. Call connect() first");
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
|
|||||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
|
import io.netty.util.concurrent.Promise;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -127,7 +128,7 @@ public class MqttIntegrationTest {
|
|||||||
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
|
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
|
||||||
MqttClient client = MqttClient.create(config, null, handlerExecutor);
|
MqttClient client = MqttClient.create(config, null, handlerExecutor);
|
||||||
client.setEventLoop(this.eventLoopGroup);
|
client.setEventLoop(this.eventLoopGroup);
|
||||||
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
|
Promise<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
|
||||||
|
|
||||||
String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort();
|
String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort();
|
||||||
MqttConnectResult result;
|
MqttConnectResult result;
|
||||||
|
|||||||
@ -18,7 +18,7 @@ package org.thingsboard.rule.engine.mqtt;
|
|||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Promise;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.mqtt.MqttClient;
|
import org.thingsboard.mqtt.MqttClient;
|
||||||
import org.thingsboard.mqtt.MqttClientConfig;
|
import org.thingsboard.mqtt.MqttClientConfig;
|
||||||
@ -121,7 +121,7 @@ public class TbMqttNode extends TbAbstractExternalNode {
|
|||||||
prepareMqttClientConfig(config);
|
prepareMqttClientConfig(config);
|
||||||
MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor());
|
MqttClient client = MqttClient.create(config, null, ctx.getExternalCallExecutor());
|
||||||
client.setEventLoop(ctx.getSharedEventLoop());
|
client.setEventLoop(ctx.getSharedEventLoop());
|
||||||
Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
|
Promise<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
|
||||||
MqttConnectResult result;
|
MqttConnectResult result;
|
||||||
try {
|
try {
|
||||||
result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);
|
result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user