Added test
This commit is contained in:
		
							parent
							
								
									fac2e013e2
								
							
						
					
					
						commit
						a6b4a20c63
					
				@ -16,8 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.transport.mqtt;
 | 
			
		||||
 | 
			
		||||
import io.netty.buffer.ByteBuf;
 | 
			
		||||
import io.netty.buffer.EmptyByteBuf;
 | 
			
		||||
import io.netty.buffer.PooledByteBufAllocator;
 | 
			
		||||
import io.netty.buffer.Unpooled;
 | 
			
		||||
import io.netty.channel.ChannelHandlerContext;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttConnectMessage;
 | 
			
		||||
import io.netty.handler.codec.mqtt.MqttConnectPayload;
 | 
			
		||||
@ -34,8 +33,19 @@ import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.Spy;
 | 
			
		||||
import org.mockito.junit.MockitoJUnitRunner;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.DeviceProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.DeviceTransportType;
 | 
			
		||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
 | 
			
		||||
import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.common.transport.TransportService;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
 | 
			
		||||
 | 
			
		||||
import java.net.InetSocketAddress;
 | 
			
		||||
import java.nio.charset.StandardCharsets;
 | 
			
		||||
@ -55,12 +65,14 @@ import static org.hamcrest.Matchers.greaterThan;
 | 
			
		||||
import static org.hamcrest.Matchers.is;
 | 
			
		||||
import static org.junit.Assert.fail;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.BDDMockito.willDoNothing;
 | 
			
		||||
import static org.mockito.BDDMockito.willReturn;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
import static org.mockito.Mockito.spy;
 | 
			
		||||
import static org.mockito.Mockito.times;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
@ -81,9 +93,14 @@ public class MqttTransportHandlerTest {
 | 
			
		||||
    ExecutorService executor;
 | 
			
		||||
    MqttTransportHandler handler;
 | 
			
		||||
 | 
			
		||||
    @Spy
 | 
			
		||||
    TransportService transportService;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void setUp() throws Exception {
 | 
			
		||||
 | 
			
		||||
        willReturn(MSG_QUEUE_LIMIT).given(context).getMessageQueueSizePerDeviceLimit();
 | 
			
		||||
        willReturn(transportService).given(context).getTransportService();
 | 
			
		||||
 | 
			
		||||
        handler = spy(new MqttTransportHandler(context, sslHandler));
 | 
			
		||||
        willReturn(IP_ADDR).given(handler).getAddress(any());
 | 
			
		||||
@ -104,9 +121,17 @@ public class MqttTransportHandlerTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    MqttPublishMessage getMqttPublishMessage() {
 | 
			
		||||
        return getMqttPublishMessage("v1/gateway/telemetry");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    MqttPublishMessage getDeviceMqttPublishMessage() {
 | 
			
		||||
        return getMqttPublishMessage("v1/devices/me/telemetry");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    MqttPublishMessage getMqttPublishMessage(String topicName) {
 | 
			
		||||
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.AT_LEAST_ONCE, false, 123);
 | 
			
		||||
        MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("v1/gateway/telemetry", packedId.incrementAndGet());
 | 
			
		||||
        ByteBuf payload = new EmptyByteBuf(new PooledByteBufAllocator());
 | 
			
		||||
        MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topicName, packedId.incrementAndGet());
 | 
			
		||||
        ByteBuf payload = Unpooled.wrappedBuffer("{\"testKey\":\"testValue\"}".getBytes());
 | 
			
		||||
        return new MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -205,4 +230,26 @@ public class MqttTransportHandlerTest {
 | 
			
		||||
        messages.forEach((msg) -> verify(handler, times(1)).processRegularSessionMsg(ctx, msg));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenMqttMessage_whenDeviceProfileMqttTransport_thenTopicAddedToMetadata() {
 | 
			
		||||
        MqttPublishMessage message = getDeviceMqttPublishMessage();
 | 
			
		||||
        when(context.getJsonMqttAdaptor()).thenReturn(new JsonMqttAdaptor());
 | 
			
		||||
        handler.deviceSessionCtx.setConnected(true);
 | 
			
		||||
        DeviceProfile deviceProfile = new DeviceProfile();
 | 
			
		||||
        DeviceProfileData deviceProfileData = new DeviceProfileData();
 | 
			
		||||
        MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration();
 | 
			
		||||
        mqttDeviceProfileTransportConfiguration.setTransportPayloadTypeConfiguration(new JsonTransportPayloadConfiguration());
 | 
			
		||||
        deviceProfileData.setTransportConfiguration(mqttDeviceProfileTransportConfiguration);
 | 
			
		||||
        deviceProfile.setProfileData(deviceProfileData);
 | 
			
		||||
        deviceProfile.setTransportType(DeviceTransportType.MQTT);
 | 
			
		||||
        handler.deviceSessionCtx.setDeviceProfile(deviceProfile);
 | 
			
		||||
 | 
			
		||||
        handler.processRegularSessionMsg(ctx, message);
 | 
			
		||||
 | 
			
		||||
        TbMsgMetaData expectedMd = new TbMsgMetaData();
 | 
			
		||||
        expectedMd.putValue(DataConstants.MQTT_TOPIC, message.variableHeader().topicName());
 | 
			
		||||
 | 
			
		||||
        verify(transportService, times(1)).process(any(), (TransportProtos.PostTelemetryMsg) any(), eq(expectedMd), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user