diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java index 65035e61fe..9b6367da30 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java @@ -97,18 +97,6 @@ public class MqttTransportHandlerTest { } } - @Test - public void givenMessageWithoutFixedHeader_whenProcessMqttMsg_thenProcessDisconnect() { - MqttFixedHeader mqttFixedHeader = null; - MqttMessage msg = new MqttMessage(mqttFixedHeader); - willDoNothing().given(handler).processDisconnect(ctx); - - handler.processMqttMsg(ctx, msg); - - assertThat(handler.address, is(IP_ADDR)); - verify(handler, times(1)).processDisconnect(ctx); - } - MqttConnectMessage getMqttConnectMessage() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, true, MqttQoS.AT_LEAST_ONCE, false, 123); MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("device", packedId.incrementAndGet(), true, true, true, 1, true, false, 60); @@ -123,6 +111,18 @@ public class MqttTransportHandlerTest { return new MqttPublishMessage(mqttFixedHeader, variableHeader, payload); } + @Test + public void givenMessageWithoutFixedHeader_whenProcessMqttMsg_thenProcessDisconnect() { + MqttFixedHeader mqttFixedHeader = null; + MqttMessage msg = new MqttMessage(mqttFixedHeader); + willDoNothing().given(handler).processDisconnect(ctx); + + handler.processMqttMsg(ctx, msg); + + assertThat(handler.address, is(IP_ADDR)); + verify(handler, times(1)).processDisconnect(ctx); + } + @Test public void givenMqttConnectMessage_whenProcessMqttMsg_thenProcessConnect() { MqttConnectMessage msg = getMqttConnectMessage(); @@ -168,16 +168,18 @@ public class MqttTransportHandlerTest { assertThat(handler.address, is(IP_ADDR)); assertThat(handler.deviceSessionCtx.getChannel(), is(ctx)); + assertThat(handler.deviceSessionCtx.isConnected(), is(false)); assertThat(handler.deviceSessionCtx.getMsgQueueSize().get(), is(MSG_QUEUE_LIMIT)); assertThat(handler.deviceSessionCtx.getMsgQueue(), contains(messages.toArray())); verify(handler, never()).processDisconnect(any()); verify(handler, times(1)).processConnect(any(), any()); verify(handler, times(MSG_QUEUE_LIMIT)).enqueueRegularSessionMsg(any(), any()); + verify(handler, never()).processRegularSessionMsg(any(), any()); messages.forEach((msg) -> verify(handler, times(1)).enqueueRegularSessionMsg(ctx, msg)); } @Test - public void givenMessageQueue_whenProcessMqttMsg_thenEnqueueRegularSessionMsg() throws InterruptedException { + public void givenMessageQueue_whenProcessMqttMsgConcurrently_thenEnqueueRegularSessionMsg() throws InterruptedException { //given assertThat(handler.deviceSessionCtx.isConnected(), is(false)); assertThat(MSG_QUEUE_LIMIT, greaterThan(2));