TB-33: Merge with master

This commit is contained in:
Andrew Shvayka 2017-01-20 17:34:06 +02:00
commit 3a15b3ff86
2 changed files with 18 additions and 5 deletions

View File

@ -75,6 +75,10 @@ mqtt:
bind_port: "${MQTT_BIND_PORT:1883}" bind_port: "${MQTT_BIND_PORT:1883}"
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
timeout: "${MQTT_TIMEOUT:10000}" timeout: "${MQTT_TIMEOUT:10000}"
netty:
leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
# Uncomment the following lines to enable ssl for MQTT # Uncomment the following lines to enable ssl for MQTT
# ssl: # ssl:
# key_store: keystore/mqttserver.jks # key_store: keystore/mqttserver.jks

View File

@ -20,8 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.util.ResourceLeakDetector;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -67,6 +66,14 @@ public class MqttTransportService {
@Value("${mqtt.adaptor}") @Value("${mqtt.adaptor}")
private String adaptorName; private String adaptorName;
@Value("${mqtt.netty.leak_detector_level}")
private String leakDetectorLevel;
@Value("${mqtt.netty.boss_group_thread_count}")
private Integer bossGroupThreadCount;
@Value("${mqtt.netty.worker_group_thread_count}")
private Integer workerGroupThreadCount;
private MqttTransportAdaptor adaptor; private MqttTransportAdaptor adaptor;
private Channel serverChannel; private Channel serverChannel;
@ -75,17 +82,19 @@ public class MqttTransportService {
@PostConstruct @PostConstruct
public void init() throws Exception { public void init() throws Exception {
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport..."); log.info("Starting MQTT transport...");
log.info("Lookup MQTT transport adaptor {}", adaptorName); log.info("Lookup MQTT transport adaptor {}", adaptorName);
this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName); this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
log.info("Starting MQTT transport server"); log.info("Starting MQTT transport server");
bossGroup = new NioEventLoopGroup(1); bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.TRACE))
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider)); .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
serverChannel = b.bind(host, port).sync().channel(); serverChannel = b.bind(host, port).sync().channel();