removed msg-processor-on-device-connect executor. will use transportCallbackExecutor instead

This commit is contained in:
Sergey Matvienko 2021-08-03 12:04:11 +03:00 committed by Andrew Shvayka
parent efe123d96b
commit 7dfed3e14a
2 changed files with 8 additions and 23 deletions

View File

@ -68,23 +68,4 @@ public class MqttTransportContext extends TransportContext {
@Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}")
private int messageQueueSizePerDeviceLimit; private int messageQueueSizePerDeviceLimit;
@Getter
private ExecutorService msqProcessorExecutor;
@Override
@PostConstruct
public void init() {
super.init();
msqProcessorExecutor = ThingsBoardExecutors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() + 1, "msg-processor-on-device-connect");
}
@Override
@PreDestroy
public void stop() {
super.stop();
if (msqProcessorExecutor != null) {
msqProcessorExecutor.shutdownNow();
}
}
} }

View File

@ -164,8 +164,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
address = (InetSocketAddress) ctx.channel().remoteAddress(); address = getAddress(ctx);
if (msg.fixedHeader() == null) { if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx); processDisconnect(ctx);
@ -181,6 +181,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
InetSocketAddress getAddress(ChannelHandlerContext ctx) {
return (InetSocketAddress) ctx.channel().remoteAddress();
}
private void processProvisionSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) { private void processProvisionSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
switch (msg.fixedHeader().messageType()) { switch (msg.fixedHeader().messageType()) {
case PUBLISH: case PUBLISH:
@ -236,7 +240,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processMsgQueue(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool processMsgQueue(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
} }
private void processMsgQueue(ChannelHandlerContext ctx) { void processMsgQueue(ChannelHandlerContext ctx) {
if (!deviceSessionCtx.isConnected()) { if (!deviceSessionCtx.isConnected()) {
log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueue().size()); log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueue().size());
return; return;
@ -821,7 +825,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
deviceSessionCtx.setConnected(true); deviceSessionCtx.setConnected(true);
log.info("[{}] Client connected!", sessionId); log.info("[{}] Client connected!", sessionId);
context.getMsqProcessorExecutor().execute(()->processMsgQueue(ctx)); processMsgQueue(ctx);
} }
@Override @Override