mqtt handler test improved

This commit is contained in:
Sergey Matvienko 2021-08-04 07:41:38 +03:00 committed by Andrew Shvayka
parent 69a7779253
commit 33887ecb3b

View File

@ -97,18 +97,6 @@ public class MqttTransportHandlerTest {
} }
} }
@Test
public void givenMessageWithoutFixedHeader_whenProcessMqttMsg_thenProcessDisconnect() {
MqttFixedHeader mqttFixedHeader = null;
MqttMessage msg = new MqttMessage(mqttFixedHeader);
willDoNothing().given(handler).processDisconnect(ctx);
handler.processMqttMsg(ctx, msg);
assertThat(handler.address, is(IP_ADDR));
verify(handler, times(1)).processDisconnect(ctx);
}
MqttConnectMessage getMqttConnectMessage() { MqttConnectMessage getMqttConnectMessage() {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, true, MqttQoS.AT_LEAST_ONCE, false, 123); MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, true, MqttQoS.AT_LEAST_ONCE, false, 123);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("device", packedId.incrementAndGet(), true, true, true, 1, true, false, 60); MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("device", packedId.incrementAndGet(), true, true, true, 1, true, false, 60);
@ -123,6 +111,18 @@ public class MqttTransportHandlerTest {
return new MqttPublishMessage(mqttFixedHeader, variableHeader, payload); return new MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
} }
@Test
public void givenMessageWithoutFixedHeader_whenProcessMqttMsg_thenProcessDisconnect() {
MqttFixedHeader mqttFixedHeader = null;
MqttMessage msg = new MqttMessage(mqttFixedHeader);
willDoNothing().given(handler).processDisconnect(ctx);
handler.processMqttMsg(ctx, msg);
assertThat(handler.address, is(IP_ADDR));
verify(handler, times(1)).processDisconnect(ctx);
}
@Test @Test
public void givenMqttConnectMessage_whenProcessMqttMsg_thenProcessConnect() { public void givenMqttConnectMessage_whenProcessMqttMsg_thenProcessConnect() {
MqttConnectMessage msg = getMqttConnectMessage(); MqttConnectMessage msg = getMqttConnectMessage();
@ -168,16 +168,18 @@ public class MqttTransportHandlerTest {
assertThat(handler.address, is(IP_ADDR)); assertThat(handler.address, is(IP_ADDR));
assertThat(handler.deviceSessionCtx.getChannel(), is(ctx)); assertThat(handler.deviceSessionCtx.getChannel(), is(ctx));
assertThat(handler.deviceSessionCtx.isConnected(), is(false));
assertThat(handler.deviceSessionCtx.getMsgQueueSize().get(), is(MSG_QUEUE_LIMIT)); assertThat(handler.deviceSessionCtx.getMsgQueueSize().get(), is(MSG_QUEUE_LIMIT));
assertThat(handler.deviceSessionCtx.getMsgQueue(), contains(messages.toArray())); assertThat(handler.deviceSessionCtx.getMsgQueue(), contains(messages.toArray()));
verify(handler, never()).processDisconnect(any()); verify(handler, never()).processDisconnect(any());
verify(handler, times(1)).processConnect(any(), any()); verify(handler, times(1)).processConnect(any(), any());
verify(handler, times(MSG_QUEUE_LIMIT)).enqueueRegularSessionMsg(any(), any()); verify(handler, times(MSG_QUEUE_LIMIT)).enqueueRegularSessionMsg(any(), any());
verify(handler, never()).processRegularSessionMsg(any(), any());
messages.forEach((msg) -> verify(handler, times(1)).enqueueRegularSessionMsg(ctx, msg)); messages.forEach((msg) -> verify(handler, times(1)).enqueueRegularSessionMsg(ctx, msg));
} }
@Test @Test
public void givenMessageQueue_whenProcessMqttMsg_thenEnqueueRegularSessionMsg() throws InterruptedException { public void givenMessageQueue_whenProcessMqttMsgConcurrently_thenEnqueueRegularSessionMsg() throws InterruptedException {
//given //given
assertThat(handler.deviceSessionCtx.isConnected(), is(false)); assertThat(handler.deviceSessionCtx.isConnected(), is(false));
assertThat(MSG_QUEUE_LIMIT, greaterThan(2)); assertThat(MSG_QUEUE_LIMIT, greaterThan(2));