Merge pull request #6967 from dmytro-landiak/netty-mqtt-fix
[3.4.1] Netty mqtt client fix ping/pong logic
This commit is contained in:
commit
090f491eaa
@ -53,6 +53,42 @@
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.vintage</groupId>
|
||||
<artifactId>junit-vintage-engine</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.takari.junit</groupId>
|
||||
<artifactId>takari-cpsuite</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@ -45,6 +45,7 @@ import io.netty.handler.timeout.IdleStateHandler;
|
||||
import io.netty.util.concurrent.DefaultPromise;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
* Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times
|
||||
*/
|
||||
@SuppressWarnings({"WeakerAccess", "unused"})
|
||||
@Slf4j
|
||||
final class MqttClientImpl implements MqttClient {
|
||||
|
||||
private final Set<String> serverSubscriptions = new HashSet<>();
|
||||
@ -131,6 +133,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
}
|
||||
|
||||
private Future<MqttConnectResult> connect(String host, int port, boolean reconnect) {
|
||||
log.trace("[{}] Connecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
||||
if (this.eventLoop == null) {
|
||||
this.eventLoop = new NioEventLoopGroup();
|
||||
}
|
||||
@ -147,10 +150,12 @@ final class MqttClientImpl implements MqttClient {
|
||||
future.addListener((ChannelFutureListener) f -> {
|
||||
if (f.isSuccess()) {
|
||||
MqttClientImpl.this.channel = f.channel();
|
||||
log.debug("[{}][{}] Connected successfully {}!", host, port, this.channel.id());
|
||||
MqttClientImpl.this.channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> {
|
||||
if (isConnected()) {
|
||||
return;
|
||||
}
|
||||
log.debug("[{}][{}] Channel is closed {}!", host, port, this.channel.id());
|
||||
ChannelClosedException e = new ChannelClosedException("Channel is closed!");
|
||||
if (callback != null) {
|
||||
callback.connectionLost(e);
|
||||
@ -169,6 +174,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
scheduleConnectIfRequired(host, port, true);
|
||||
});
|
||||
} else {
|
||||
log.debug("[{}][{}] Connect failed, trying reconnect!", host, port);
|
||||
scheduleConnectIfRequired(host, port, reconnect);
|
||||
}
|
||||
});
|
||||
@ -176,6 +182,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
}
|
||||
|
||||
private void scheduleConnectIfRequired(String host, int port, boolean reconnect) {
|
||||
log.trace("[{}] Scheduling connect to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
||||
if (clientConfig.isReconnect() && !disconnected) {
|
||||
if (reconnect) {
|
||||
this.reconnect = true;
|
||||
@ -191,6 +198,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
|
||||
@Override
|
||||
public Future<MqttConnectResult> reconnect() {
|
||||
log.trace("[{}] Reconnecting to server, isReconnect - {}", channel != null ? channel.id() : "UNKNOWN", reconnect);
|
||||
if (host == null) {
|
||||
throw new IllegalStateException("Cannot reconnect. Call connect() first");
|
||||
}
|
||||
@ -281,6 +289,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
*/
|
||||
@Override
|
||||
public Future<Void> off(String topic, MqttHandler handler) {
|
||||
log.trace("[{}] Unsubscribing from {}", channel != null ? channel.id() : "UNKNOWN", topic);
|
||||
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
|
||||
for (MqttSubscription subscription : this.handlerToSubscription.get(handler)) {
|
||||
this.subscriptions.remove(topic, subscription);
|
||||
@ -299,6 +308,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
*/
|
||||
@Override
|
||||
public Future<Void> off(String topic) {
|
||||
log.trace("[{}] Unsubscribing from {}", channel != null ? channel.id() : "UNKNOWN", topic);
|
||||
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
|
||||
ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic));
|
||||
for (MqttSubscription subscription : subscriptions) {
|
||||
@ -360,6 +370,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
*/
|
||||
@Override
|
||||
public Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain) {
|
||||
log.trace("[{}] Publishing message to {}", channel != null ? channel.id() : "UNKNOWN", topic);
|
||||
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
|
||||
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
|
||||
@ -404,6 +415,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
|
||||
@Override
|
||||
public void disconnect() {
|
||||
log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN");
|
||||
disconnected = true;
|
||||
if (this.channel != null) {
|
||||
MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
|
||||
@ -435,6 +447,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
return null;
|
||||
}
|
||||
if (this.channel.isActive()) {
|
||||
log.trace("[{}] Sending message {}", channel != null ? channel.id() : "UNKNOWN", message);
|
||||
return this.channel.writeAndFlush(message);
|
||||
}
|
||||
return this.channel.newFailedFuture(new ChannelClosedException("Channel is closed!"));
|
||||
@ -450,6 +463,7 @@ final class MqttClientImpl implements MqttClient {
|
||||
}
|
||||
|
||||
private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
|
||||
log.trace("[{}] Creating subscription to {}", channel != null ? channel.id() : "UNKNOWN", topic);
|
||||
if (this.pendingSubscribeTopics.contains(topic)) {
|
||||
Optional<Map.Entry<Integer, MqttPendingSubscription>> subscriptionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
|
||||
if (subscriptionEntry.isPresent()) {
|
||||
|
||||
@ -26,9 +26,11 @@ import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.ScheduledFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
final class MqttPingHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private final int keepaliveSeconds;
|
||||
@ -46,11 +48,11 @@ final class MqttPingHandler extends ChannelInboundHandlerAdapter {
|
||||
return;
|
||||
}
|
||||
MqttMessage message = (MqttMessage) msg;
|
||||
if(message.fixedHeader().messageType() == MqttMessageType.PINGREQ){
|
||||
if (message.fixedHeader().messageType() == MqttMessageType.PINGREQ) {
|
||||
this.handlePingReq(ctx.channel());
|
||||
} else if(message.fixedHeader().messageType() == MqttMessageType.PINGRESP){
|
||||
this.handlePingResp();
|
||||
}else{
|
||||
} else if (message.fixedHeader().messageType() == MqttMessageType.PINGRESP) {
|
||||
this.handlePingResp(ctx.channel());
|
||||
} else {
|
||||
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
|
||||
}
|
||||
}
|
||||
@ -59,23 +61,27 @@ final class MqttPingHandler extends ChannelInboundHandlerAdapter {
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
super.userEventTriggered(ctx, evt);
|
||||
|
||||
if(evt instanceof IdleStateEvent){
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
IdleStateEvent event = (IdleStateEvent) evt;
|
||||
switch(event.state()){
|
||||
switch (event.state()) {
|
||||
case READER_IDLE:
|
||||
log.debug("[{}] No reads were performed for specified period for channel {}", event.state(), ctx.channel().id());
|
||||
this.sendPingReq(ctx.channel());
|
||||
break;
|
||||
case WRITER_IDLE:
|
||||
log.debug("[{}] No writes were performed for specified period for channel {}", event.state(), ctx.channel().id());
|
||||
this.sendPingReq(ctx.channel());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sendPingReq(Channel channel){
|
||||
private void sendPingReq(Channel channel) {
|
||||
log.trace("[{}] Sending ping request", channel.id());
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
channel.writeAndFlush(new MqttMessage(fixedHeader));
|
||||
|
||||
if(this.pingRespTimeout != null){
|
||||
if (this.pingRespTimeout == null) {
|
||||
this.pingRespTimeout = channel.eventLoop().schedule(() -> {
|
||||
MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
|
||||
@ -84,13 +90,15 @@ final class MqttPingHandler extends ChannelInboundHandlerAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
private void handlePingReq(Channel channel){
|
||||
private void handlePingReq(Channel channel) {
|
||||
log.trace("[{}] Handling ping request", channel.id());
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
channel.writeAndFlush(new MqttMessage(fixedHeader));
|
||||
}
|
||||
|
||||
private void handlePingResp(){
|
||||
if(this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()){
|
||||
private void handlePingResp(Channel channel) {
|
||||
log.trace("[{}] Handling ping response", channel.id());
|
||||
if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
|
||||
this.pingRespTimeout.cancel(true);
|
||||
this.pingRespTimeout = null;
|
||||
}
|
||||
|
||||
@ -0,0 +1,63 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.mqtt;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.DefaultEventLoop;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.after;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
class MqttPingHandlerTest {
|
||||
|
||||
static final int KEEP_ALIVE_SECONDS = 0;
|
||||
static final int PROCESS_SEND_DISCONNECT_MSG_TIME_MS = 500;
|
||||
|
||||
MqttPingHandler mqttPingHandler;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
mqttPingHandler = new MqttPingHandler(KEEP_ALIVE_SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenChannelReaderIdleState_whenNoPingResponse_thenDisconnectClient() throws Exception {
|
||||
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
|
||||
Channel channel = mock(Channel.class);
|
||||
when(ctx.channel()).thenReturn(channel);
|
||||
when(channel.eventLoop()).thenReturn(new DefaultEventLoop());
|
||||
ChannelFuture channelFuture = mock(ChannelFuture.class);
|
||||
when(channel.writeAndFlush(any())).thenReturn(channelFuture);
|
||||
|
||||
mqttPingHandler.userEventTriggered(ctx, IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT);
|
||||
verify(
|
||||
channelFuture,
|
||||
after(TimeUnit.SECONDS.toMillis(KEEP_ALIVE_SECONDS) + PROCESS_SEND_DISCONNECT_MSG_TIME_MS)
|
||||
).addListener(eq(ChannelFutureListener.CLOSE));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,27 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.mqtt.integration;
|
||||
|
||||
import org.junit.extensions.cpsuite.ClasspathSuite;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
@RunWith(ClasspathSuite.class)
|
||||
@ClasspathSuite.ClassnameFilters({
|
||||
"org.thingsboard.mqtt.integration.*Test",
|
||||
})
|
||||
public class IntegrationTestSuite {
|
||||
|
||||
}
|
||||
@ -0,0 +1,139 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.mqtt.integration;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.mqtt.MqttClient;
|
||||
import org.thingsboard.mqtt.MqttClientConfig;
|
||||
import org.thingsboard.mqtt.MqttConnectResult;
|
||||
import org.thingsboard.mqtt.integration.server.MqttServer;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@Slf4j
|
||||
public class MqttIntegrationTest {
|
||||
|
||||
static final String MQTT_HOST = "localhost";
|
||||
static final int KEEPALIVE_TIMEOUT_SECONDS = 2;
|
||||
static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
|
||||
|
||||
EventLoopGroup eventLoopGroup;
|
||||
MqttServer mqttServer;
|
||||
|
||||
MqttClient mqttClient;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
this.eventLoopGroup = new NioEventLoopGroup();
|
||||
|
||||
this.mqttServer = new MqttServer();
|
||||
this.mqttServer.init();
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroy() throws InterruptedException {
|
||||
if (this.mqttClient != null) {
|
||||
this.mqttClient.disconnect();
|
||||
}
|
||||
if (this.mqttServer != null) {
|
||||
this.mqttServer.shutdown();
|
||||
}
|
||||
if (this.eventLoopGroup != null) {
|
||||
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenActiveMqttClient_whenNoActivityForKeepAliveTimeout_thenDisconnectClient() throws Throwable {
|
||||
//given
|
||||
this.mqttClient = initClient();
|
||||
|
||||
log.warn("Sending publish messages...");
|
||||
CountDownLatch latch = new CountDownLatch(3);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Future<Void> pubFuture = publishMsg();
|
||||
pubFuture.addListener(future -> latch.countDown());
|
||||
}
|
||||
|
||||
log.warn("Waiting for messages acknowledgments...");
|
||||
boolean awaitResult = latch.await(10, TimeUnit.SECONDS);
|
||||
Assert.assertTrue(awaitResult);
|
||||
|
||||
//when
|
||||
CountDownLatch keepAliveLatch = new CountDownLatch(1);
|
||||
|
||||
log.warn("Starting idle period...");
|
||||
boolean keepaliveAwaitResult = keepAliveLatch.await(5, TimeUnit.SECONDS);
|
||||
Assert.assertFalse(keepaliveAwaitResult);
|
||||
|
||||
//then
|
||||
List<MqttMessageType> allReceivedEvents = this.mqttServer.getEventsFromClient();
|
||||
long pubCount = allReceivedEvents.stream().filter(mqttMessageType -> mqttMessageType == MqttMessageType.PUBLISH).count();
|
||||
long disconnectCount = allReceivedEvents.stream().filter(type -> type == MqttMessageType.DISCONNECT).count();
|
||||
|
||||
Assert.assertEquals(3, pubCount);
|
||||
Assert.assertEquals(1, disconnectCount);
|
||||
}
|
||||
|
||||
private Future<Void> publishMsg() {
|
||||
ByteBuf byteBuf = ALLOCATOR.buffer();
|
||||
byteBuf.writeBytes("payload".getBytes(StandardCharsets.UTF_8));
|
||||
return this.mqttClient.publish(
|
||||
"test/topic",
|
||||
byteBuf,
|
||||
MqttQoS.AT_LEAST_ONCE);
|
||||
}
|
||||
|
||||
private MqttClient initClient() throws Exception {
|
||||
MqttClientConfig config = new MqttClientConfig();
|
||||
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
|
||||
MqttClient client = MqttClient.create(config, null);
|
||||
client.setEventLoop(this.eventLoopGroup);
|
||||
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
|
||||
|
||||
String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort();
|
||||
MqttConnectResult result;
|
||||
try {
|
||||
result = connectFuture.get(10, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException ex) {
|
||||
connectFuture.cancel(true);
|
||||
client.disconnect();
|
||||
throw new RuntimeException(String.format("Failed to connect to MQTT server at %s.", hostPort));
|
||||
}
|
||||
if (!result.isSuccess()) {
|
||||
connectFuture.cancel(true);
|
||||
client.disconnect();
|
||||
throw new RuntimeException(String.format("Failed to connect to MQTT server at %s. Result code is: %s", hostPort, result.getReturnCode()));
|
||||
}
|
||||
return client;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,84 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.mqtt.integration.server;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.handler.codec.mqtt.MqttDecoder;
|
||||
import io.netty.handler.codec.mqtt.MqttEncoder;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
@Slf4j
|
||||
public class MqttServer {
|
||||
|
||||
@Getter
|
||||
private final List<MqttMessageType> eventsFromClient = new CopyOnWriteArrayList<>();
|
||||
@Getter
|
||||
private final int mqttPort = 8885;
|
||||
|
||||
private Channel serverChannel;
|
||||
private EventLoopGroup bossGroup;
|
||||
private EventLoopGroup workerGroup;
|
||||
|
||||
public void init() throws Exception {
|
||||
log.info("Starting MQTT server...");
|
||||
bossGroup = new NioEventLoopGroup();
|
||||
workerGroup = new NioEventLoopGroup();
|
||||
ServerBootstrap b = new ServerBootstrap();
|
||||
b.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
pipeline.addLast("decoder", new MqttDecoder(65536));
|
||||
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
|
||||
|
||||
MqttTransportHandler handler = new MqttTransportHandler(eventsFromClient);
|
||||
|
||||
pipeline.addLast(handler);
|
||||
ch.closeFuture().addListener(handler);
|
||||
}
|
||||
})
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true);
|
||||
|
||||
serverChannel = b.bind(mqttPort).sync().channel();
|
||||
log.info("Mqtt transport started!");
|
||||
}
|
||||
|
||||
public void shutdown() throws InterruptedException {
|
||||
log.info("Stopping MQTT transport!");
|
||||
try {
|
||||
serverChannel.close().sync();
|
||||
} finally {
|
||||
workerGroup.shutdownGracefully();
|
||||
bossGroup.shutdownGracefully();
|
||||
}
|
||||
log.info("MQTT transport stopped!");
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,141 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.mqtt.integration.server;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
||||
import io.netty.handler.codec.mqtt.MqttFixedHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
|
||||
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNECT;
|
||||
import static io.netty.handler.codec.mqtt.MqttMessageType.DISCONNECT;
|
||||
import static io.netty.handler.codec.mqtt.MqttMessageType.PINGREQ;
|
||||
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
|
||||
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBLISH;
|
||||
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
|
||||
|
||||
@Slf4j
|
||||
public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
|
||||
|
||||
private final List<MqttMessageType> eventsFromClient;
|
||||
private final UUID sessionId;
|
||||
|
||||
MqttTransportHandler(List<MqttMessageType> eventsFromClient) {
|
||||
this.sessionId = UUID.randomUUID();
|
||||
this.eventsFromClient = eventsFromClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
log.trace("[{}] Processing msg: {}", sessionId, msg);
|
||||
try {
|
||||
if (msg instanceof MqttMessage) {
|
||||
MqttMessage message = (MqttMessage) msg;
|
||||
if (message.decoderResult().isSuccess()) {
|
||||
processMqttMsg(ctx, message);
|
||||
} else {
|
||||
log.error("[{}] Message decoding failed: {}", sessionId, message.decoderResult().cause().getMessage());
|
||||
ctx.close();
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] Received non mqtt message: {}", sessionId, msg.getClass().getSimpleName());
|
||||
ctx.close();
|
||||
}
|
||||
} finally {
|
||||
ReferenceCountUtil.safeRelease(msg);
|
||||
}
|
||||
}
|
||||
|
||||
void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
|
||||
if (msg.fixedHeader() == null) {
|
||||
ctx.close();
|
||||
return;
|
||||
}
|
||||
switch (msg.fixedHeader().messageType()) {
|
||||
case CONNECT:
|
||||
eventsFromClient.add(CONNECT);
|
||||
processConnect(ctx, (MqttConnectMessage) msg);
|
||||
break;
|
||||
case DISCONNECT:
|
||||
eventsFromClient.add(DISCONNECT);
|
||||
ctx.close();
|
||||
break;
|
||||
case PUBLISH:
|
||||
// QoS 0 and 1 supported only here
|
||||
eventsFromClient.add(PUBLISH);
|
||||
MqttPublishMessage mqttPubMsg = (MqttPublishMessage) msg;
|
||||
ack(ctx, mqttPubMsg.variableHeader().packetId());
|
||||
break;
|
||||
case PINGREQ:
|
||||
// We will not handle PINGREQ and will not send any PINGRESP to simulate the MQTT server is down
|
||||
eventsFromClient.add(PINGREQ);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
|
||||
String userName = msg.payload().userName();
|
||||
String clientId = msg.payload().clientIdentifier();
|
||||
|
||||
log.warn("[{}][{}] Processing connect msg for client: {}!", sessionId, userName, clientId);
|
||||
ctx.writeAndFlush(createMqttConnAckMsg(msg));
|
||||
}
|
||||
|
||||
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectMessage msg) {
|
||||
MqttFixedHeader mqttFixedHeader =
|
||||
new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
|
||||
MqttConnAckVariableHeader mqttConnAckVariableHeader =
|
||||
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !msg.variableHeader().isCleanSession());
|
||||
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
|
||||
}
|
||||
|
||||
private void ack(ChannelHandlerContext ctx, int msgId) {
|
||||
if (msgId > 0) {
|
||||
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
|
||||
}
|
||||
}
|
||||
|
||||
public static MqttPubAckMessage createMqttPubAckMsg(int requestId) {
|
||||
MqttFixedHeader mqttFixedHeader =
|
||||
new MqttFixedHeader(PUBACK, false, AT_MOST_ONCE, false, 0);
|
||||
MqttMessageIdVariableHeader mqttMsgIdVariableHeader =
|
||||
MqttMessageIdVariableHeader.from(requestId);
|
||||
return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<? super Void> future) {
|
||||
log.trace("[{}] Channel closed!", sessionId);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user