diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f150574e66..4d08fe03c7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -75,6 +75,10 @@ mqtt: bind_port: "${MQTT_BIND_PORT:1883}" adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" timeout: "${MQTT_TIMEOUT:10000}" + netty: + leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}" + boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}" + worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}" # Uncomment the following lines to enable ssl for MQTT # ssl: # key_store: keystore/mqttserver.jks diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java index 1543400177..8710809b28 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java @@ -20,8 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; +import io.netty.util.ResourceLeakDetector; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -67,6 +66,14 @@ public class MqttTransportService { @Value("${mqtt.adaptor}") private String adaptorName; + @Value("${mqtt.netty.leak_detector_level}") + private String leakDetectorLevel; + @Value("${mqtt.netty.boss_group_thread_count}") + private Integer bossGroupThreadCount; + @Value("${mqtt.netty.worker_group_thread_count}") + private Integer workerGroupThreadCount; + + private MqttTransportAdaptor adaptor; private Channel serverChannel; @@ -75,17 +82,19 @@ public class MqttTransportService { @PostConstruct public void init() throws Exception { + log.info("Setting resource leak detector level to {}", leakDetectorLevel); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); + log.info("Starting MQTT transport..."); log.info("Lookup MQTT transport adaptor {}", adaptorName); this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName); log.info("Starting MQTT transport server"); - bossGroup = new NioEventLoopGroup(1); - workerGroup = new NioEventLoopGroup(); + bossGroup = new NioEventLoopGroup(bossGroupThreadCount); + workerGroup = new NioEventLoopGroup(workerGroupThreadCount); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.TRACE)) .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider)); serverChannel = b.bind(host, port).sync().channel();