diff --git a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java index 4892acb161..f2ee512760 100644 --- a/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java +++ b/common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/MqttTransportHandlerTest.java @@ -16,8 +16,7 @@ package org.thingsboard.server.transport.mqtt; import io.netty.buffer.ByteBuf; -import io.netty.buffer.EmptyByteBuf; -import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; @@ -34,8 +33,19 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.device.profile.DeviceProfileData; +import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; @@ -55,12 +65,14 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willDoNothing; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @Slf4j @RunWith(MockitoJUnitRunner.class) @@ -81,9 +93,14 @@ public class MqttTransportHandlerTest { ExecutorService executor; MqttTransportHandler handler; + @Spy + TransportService transportService; + @Before public void setUp() throws Exception { + willReturn(MSG_QUEUE_LIMIT).given(context).getMessageQueueSizePerDeviceLimit(); + willReturn(transportService).given(context).getTransportService(); handler = spy(new MqttTransportHandler(context, sslHandler)); willReturn(IP_ADDR).given(handler).getAddress(any()); @@ -104,9 +121,17 @@ public class MqttTransportHandlerTest { } MqttPublishMessage getMqttPublishMessage() { + return getMqttPublishMessage("v1/gateway/telemetry"); + } + + MqttPublishMessage getDeviceMqttPublishMessage() { + return getMqttPublishMessage("v1/devices/me/telemetry"); + } + + MqttPublishMessage getMqttPublishMessage(String topicName) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.AT_LEAST_ONCE, false, 123); - MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("v1/gateway/telemetry", packedId.incrementAndGet()); - ByteBuf payload = new EmptyByteBuf(new PooledByteBufAllocator()); + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topicName, packedId.incrementAndGet()); + ByteBuf payload = Unpooled.wrappedBuffer("{\"testKey\":\"testValue\"}".getBytes()); return new MqttPublishMessage(mqttFixedHeader, variableHeader, payload); } @@ -205,4 +230,26 @@ public class MqttTransportHandlerTest { messages.forEach((msg) -> verify(handler, times(1)).processRegularSessionMsg(ctx, msg)); } + @Test + public void givenMqttMessage_whenDeviceProfileMqttTransport_thenTopicAddedToMetadata() { + MqttPublishMessage message = getDeviceMqttPublishMessage(); + when(context.getJsonMqttAdaptor()).thenReturn(new JsonMqttAdaptor()); + handler.deviceSessionCtx.setConnected(true); + DeviceProfile deviceProfile = new DeviceProfile(); + DeviceProfileData deviceProfileData = new DeviceProfileData(); + MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration(); + mqttDeviceProfileTransportConfiguration.setTransportPayloadTypeConfiguration(new JsonTransportPayloadConfiguration()); + deviceProfileData.setTransportConfiguration(mqttDeviceProfileTransportConfiguration); + deviceProfile.setProfileData(deviceProfileData); + deviceProfile.setTransportType(DeviceTransportType.MQTT); + handler.deviceSessionCtx.setDeviceProfile(deviceProfile); + + handler.processRegularSessionMsg(ctx, message); + + TbMsgMetaData expectedMd = new TbMsgMetaData(); + expectedMd.putValue(DataConstants.MQTT_TOPIC, message.variableHeader().topicName()); + + verify(transportService, times(1)).process(any(), (TransportProtos.PostTelemetryMsg) any(), eq(expectedMd), any()); + } + } \ No newline at end of file