Merge pull request #8173 from dmytro-landiak/mqtt-test-fix

[3.5] mqtt integration test improved
This commit is contained in:
Andrew Shvayka 2023-04-10 13:33:23 +03:00 committed by GitHub
commit 08952b2ef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -15,9 +15,7 @@
*/
package org.thingsboard.mqtt.integration;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttMessageType;
@ -44,7 +42,7 @@ public class MqttIntegrationTest {
static final String MQTT_HOST = "localhost";
static final int KEEPALIVE_TIMEOUT_SECONDS = 2;
static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
static final long RECONNECT_DELAY_SECONDS = 10L;
EventLoopGroup eventLoopGroup;
MqttServer mqttServer;
@ -68,7 +66,7 @@ public class MqttIntegrationTest {
this.mqttServer.shutdown();
}
if (this.eventLoopGroup != null) {
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
}
@ -80,6 +78,7 @@ public class MqttIntegrationTest {
log.warn("Sending publish messages...");
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
Thread.sleep(30);
Future<Void> pubFuture = publishMsg();
pubFuture.addListener(future -> latch.countDown());
}
@ -87,35 +86,30 @@ public class MqttIntegrationTest {
log.warn("Waiting for messages acknowledgments...");
boolean awaitResult = latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(awaitResult);
log.warn("Messages are delivered successfully...");
//when
CountDownLatch keepAliveLatch = new CountDownLatch(1);
log.warn("Starting idle period...");
boolean keepaliveAwaitResult = keepAliveLatch.await(5, TimeUnit.SECONDS);
Assert.assertFalse(keepaliveAwaitResult);
Thread.sleep(5000);
//then
List<MqttMessageType> allReceivedEvents = this.mqttServer.getEventsFromClient();
long pubCount = allReceivedEvents.stream().filter(mqttMessageType -> mqttMessageType == MqttMessageType.PUBLISH).count();
long disconnectCount = allReceivedEvents.stream().filter(type -> type == MqttMessageType.DISCONNECT).count();
Assert.assertEquals(3, pubCount);
Assert.assertEquals(1, disconnectCount);
}
private Future<Void> publishMsg() {
ByteBuf byteBuf = ALLOCATOR.buffer();
byteBuf.writeBytes("payload".getBytes(StandardCharsets.UTF_8));
return this.mqttClient.publish(
"test/topic",
byteBuf,
MqttQoS.AT_LEAST_ONCE);
Unpooled.wrappedBuffer("payload".getBytes(StandardCharsets.UTF_8)),
MqttQoS.AT_MOST_ONCE);
}
private MqttClient initClient() throws Exception {
MqttClientConfig config = new MqttClientConfig();
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
MqttClient client = MqttClient.create(config, null);
client.setEventLoop(this.eventLoopGroup);
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());