mqtt transport handler refactored for test

This commit is contained in:
Sergey Matvienko 2021-08-03 18:54:07 +03:00 committed by Andrew Shvayka
parent 43fc44f071
commit 607fd7a74f

View File

@ -123,9 +123,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final SslHandler sslHandler; private final SslHandler sslHandler;
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final DeviceSessionCtx deviceSessionCtx; final DeviceSessionCtx deviceSessionCtx;
private volatile InetSocketAddress address; volatile InetSocketAddress address;
private volatile GatewaySessionHandler gatewaySessionHandler; volatile GatewaySessionHandler gatewaySessionHandler;
private final ConcurrentHashMap<String, String> otaPackSessions; private final ConcurrentHashMap<String, String> otaPackSessions;
private final ConcurrentHashMap<String, Integer> chunkSizes; private final ConcurrentHashMap<String, Integer> chunkSizes;
@ -227,7 +227,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
private void enqueueRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) { void enqueueRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
final int queueSize = deviceSessionCtx.getMsgQueueSize().incrementAndGet(); final int queueSize = deviceSessionCtx.getMsgQueueSize().incrementAndGet();
if (queueSize > context.getMessageQueueSizePerDeviceLimit()) { if (queueSize > context.getMessageQueueSizePerDeviceLimit()) {
log.warn("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}", log.warn("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
@ -262,7 +262,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
private void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) { void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
switch (msg.fixedHeader().messageType()) { switch (msg.fixedHeader().messageType()) {
case PUBLISH: case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg); processPublish(ctx, (MqttPublishMessage) msg);
@ -628,7 +628,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader); return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
} }
private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier()); log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
String userName = msg.payload().userName(); String userName = msg.payload().userName();
String clientId = msg.payload().clientIdentifier(); String clientId = msg.payload().clientIdentifier();
@ -714,7 +714,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return null; return null;
} }
private void processDisconnect(ChannelHandlerContext ctx) { void processDisconnect(ChannelHandlerContext ctx) {
ctx.close(); ctx.close();
log.info("[{}] Client disconnected!", sessionId); log.info("[{}] Client disconnected!", sessionId);
doDisconnect(); doDisconnect();