From cf9159db266813536de76732f84836ecc0ff30d1 Mon Sep 17 00:00:00 2001 From: dlandiak Date: Thu, 2 Mar 2023 15:46:50 +0200 Subject: [PATCH] mqtt integration test improved --- .../mqtt/integration/MqttIntegrationTest.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index 9778ac69f1..cb1b6b81fe 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -15,9 +15,7 @@ */ package org.thingsboard.mqtt.integration; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.mqtt.MqttMessageType; @@ -44,7 +42,7 @@ public class MqttIntegrationTest { static final String MQTT_HOST = "localhost"; static final int KEEPALIVE_TIMEOUT_SECONDS = 2; - static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); + static final long RECONNECT_DELAY_SECONDS = 10L; EventLoopGroup eventLoopGroup; MqttServer mqttServer; @@ -68,7 +66,7 @@ public class MqttIntegrationTest { this.mqttServer.shutdown(); } if (this.eventLoopGroup != null) { - this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } } @@ -80,6 +78,7 @@ public class MqttIntegrationTest { log.warn("Sending publish messages..."); CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { + Thread.sleep(30); Future pubFuture = publishMsg(); pubFuture.addListener(future -> latch.countDown()); } @@ -87,35 +86,30 @@ public class MqttIntegrationTest { log.warn("Waiting for messages acknowledgments..."); boolean awaitResult = latch.await(10, TimeUnit.SECONDS); Assert.assertTrue(awaitResult); + log.warn("Messages are delivered successfully..."); //when - CountDownLatch keepAliveLatch = new CountDownLatch(1); - log.warn("Starting idle period..."); - boolean keepaliveAwaitResult = keepAliveLatch.await(5, TimeUnit.SECONDS); - Assert.assertFalse(keepaliveAwaitResult); + Thread.sleep(5000); //then List allReceivedEvents = this.mqttServer.getEventsFromClient(); - long pubCount = allReceivedEvents.stream().filter(mqttMessageType -> mqttMessageType == MqttMessageType.PUBLISH).count(); long disconnectCount = allReceivedEvents.stream().filter(type -> type == MqttMessageType.DISCONNECT).count(); - Assert.assertEquals(3, pubCount); Assert.assertEquals(1, disconnectCount); } private Future publishMsg() { - ByteBuf byteBuf = ALLOCATOR.buffer(); - byteBuf.writeBytes("payload".getBytes(StandardCharsets.UTF_8)); return this.mqttClient.publish( "test/topic", - byteBuf, - MqttQoS.AT_LEAST_ONCE); + Unpooled.wrappedBuffer("payload".getBytes(StandardCharsets.UTF_8)), + MqttQoS.AT_MOST_ONCE); } private MqttClient initClient() throws Exception { MqttClientConfig config = new MqttClientConfig(); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); + config.setReconnectDelay(RECONNECT_DELAY_SECONDS); MqttClient client = MqttClient.create(config, null); client.setEventLoop(this.eventLoopGroup); Future connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());