diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 82c98c0fac..11b46696da 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -68,23 +68,4 @@ public class MqttTransportContext extends TransportContext { @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") private int messageQueueSizePerDeviceLimit; - @Getter - private ExecutorService msqProcessorExecutor; - - @Override - @PostConstruct - public void init() { - super.init(); - msqProcessorExecutor = ThingsBoardExecutors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() + 1, "msg-processor-on-device-connect"); - } - - @Override - @PreDestroy - public void stop() { - super.stop(); - if (msqProcessorExecutor != null) { - msqProcessorExecutor.shutdownNow(); - } - } - } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 0004f0dcf2..52583c521f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -164,8 +164,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { - address = (InetSocketAddress) ctx.channel().remoteAddress(); + void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { + address = getAddress(ctx); if (msg.fixedHeader() == null) { log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); processDisconnect(ctx); @@ -181,6 +181,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + InetSocketAddress getAddress(ChannelHandlerContext ctx) { + return (InetSocketAddress) ctx.channel().remoteAddress(); + } + private void processProvisionSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) { switch (msg.fixedHeader().messageType()) { case PUBLISH: @@ -236,7 +240,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement processMsgQueue(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool } - private void processMsgQueue(ChannelHandlerContext ctx) { + void processMsgQueue(ChannelHandlerContext ctx) { if (!deviceSessionCtx.isConnected()) { log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueue().size()); return; @@ -821,7 +825,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); deviceSessionCtx.setConnected(true); log.info("[{}] Client connected!", sessionId); - context.getMsqProcessorExecutor().execute(()->processMsgQueue(ctx)); + processMsgQueue(ctx); } @Override