mqtt integration test improved

This commit is contained in:
dlandiak 2023-03-02 15:46:50 +02:00
parent 08cd535751
commit cf9159db26

View File

@ -15,9 +15,7 @@
*/ */
package org.thingsboard.mqtt.integration; package org.thingsboard.mqtt.integration;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
@ -44,7 +42,7 @@ public class MqttIntegrationTest {
static final String MQTT_HOST = "localhost"; static final String MQTT_HOST = "localhost";
static final int KEEPALIVE_TIMEOUT_SECONDS = 2; static final int KEEPALIVE_TIMEOUT_SECONDS = 2;
static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); static final long RECONNECT_DELAY_SECONDS = 10L;
EventLoopGroup eventLoopGroup; EventLoopGroup eventLoopGroup;
MqttServer mqttServer; MqttServer mqttServer;
@ -68,7 +66,7 @@ public class MqttIntegrationTest {
this.mqttServer.shutdown(); this.mqttServer.shutdown();
} }
if (this.eventLoopGroup != null) { if (this.eventLoopGroup != null) {
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
} }
} }
@ -80,6 +78,7 @@ public class MqttIntegrationTest {
log.warn("Sending publish messages..."); log.warn("Sending publish messages...");
CountDownLatch latch = new CountDownLatch(3); CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
Thread.sleep(30);
Future<Void> pubFuture = publishMsg(); Future<Void> pubFuture = publishMsg();
pubFuture.addListener(future -> latch.countDown()); pubFuture.addListener(future -> latch.countDown());
} }
@ -87,35 +86,30 @@ public class MqttIntegrationTest {
log.warn("Waiting for messages acknowledgments..."); log.warn("Waiting for messages acknowledgments...");
boolean awaitResult = latch.await(10, TimeUnit.SECONDS); boolean awaitResult = latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(awaitResult); Assert.assertTrue(awaitResult);
log.warn("Messages are delivered successfully...");
//when //when
CountDownLatch keepAliveLatch = new CountDownLatch(1);
log.warn("Starting idle period..."); log.warn("Starting idle period...");
boolean keepaliveAwaitResult = keepAliveLatch.await(5, TimeUnit.SECONDS); Thread.sleep(5000);
Assert.assertFalse(keepaliveAwaitResult);
//then //then
List<MqttMessageType> allReceivedEvents = this.mqttServer.getEventsFromClient(); List<MqttMessageType> allReceivedEvents = this.mqttServer.getEventsFromClient();
long pubCount = allReceivedEvents.stream().filter(mqttMessageType -> mqttMessageType == MqttMessageType.PUBLISH).count();
long disconnectCount = allReceivedEvents.stream().filter(type -> type == MqttMessageType.DISCONNECT).count(); long disconnectCount = allReceivedEvents.stream().filter(type -> type == MqttMessageType.DISCONNECT).count();
Assert.assertEquals(3, pubCount);
Assert.assertEquals(1, disconnectCount); Assert.assertEquals(1, disconnectCount);
} }
private Future<Void> publishMsg() { private Future<Void> publishMsg() {
ByteBuf byteBuf = ALLOCATOR.buffer();
byteBuf.writeBytes("payload".getBytes(StandardCharsets.UTF_8));
return this.mqttClient.publish( return this.mqttClient.publish(
"test/topic", "test/topic",
byteBuf, Unpooled.wrappedBuffer("payload".getBytes(StandardCharsets.UTF_8)),
MqttQoS.AT_LEAST_ONCE); MqttQoS.AT_MOST_ONCE);
} }
private MqttClient initClient() throws Exception { private MqttClient initClient() throws Exception {
MqttClientConfig config = new MqttClientConfig(); MqttClientConfig config = new MqttClientConfig();
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
MqttClient client = MqttClient.create(config, null); MqttClient client = MqttClient.create(config, null);
client.setEventLoop(this.eventLoopGroup); client.setEventLoop(this.eventLoopGroup);
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());