From 1507ac69abe27bf76bf9b8cf9872c0c78210e8a7 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 14 Jan 2022 11:50:36 +0200 Subject: [PATCH] Revert "MqttTransportHandler refactored to not reference on InetSocketAddress instances to free some heap space. IPv4 are stored and logged as int." This reverts commit 0ef9d5753cd271fd7c8729567b5a2765943538d1. --- .../transport/mqtt/MqttTransportHandler.java | 25 ++++++------------- .../mqtt/session/DeviceSessionCtx.java | 2 +- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index c99b4e1a45..9849811c71 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -75,7 +75,6 @@ import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; import javax.net.ssl.SSLPeerUnverifiedException; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.util.ArrayList; @@ -113,6 +112,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); + private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; @@ -125,8 +125,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final ConcurrentMap mqttQoSMap; final DeviceSessionCtx deviceSessionCtx; - volatile int ip = 0; - volatile int port = 0; + volatile InetSocketAddress address; volatile GatewaySessionHandler gatewaySessionHandler; private final ConcurrentHashMap otaPackSessions; @@ -185,14 +184,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { - if (port == 0) { - InetSocketAddress address = getAddress(ctx); - ip = getIpv4(address); //ipv6 will not appear in logs - port = address.getPort(); - } - + address = getAddress(ctx); if (msg.fixedHeader() == null) { - log.info("[{}:{}] Invalid message received", ip, port); + log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); ctx.close(); return; } @@ -206,11 +200,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - int getIpv4(InetSocketAddress address) { - byte[] ipBytes = address.getAddress().getAddress(); - return ipBytes.length == 4 ? ByteBuffer.wrap(ipBytes).getInt() : -1; - } - InetSocketAddress getAddress(ChannelHandlerContext ctx) { return (InetSocketAddress) ctx.channel().remoteAddress(); } @@ -803,7 +792,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onError(Throwable e) { - log.trace("[{}] Failed to process credentials: {}", ip, userName, e); + log.trace("[{}] Failed to process credentials: {}", address, userName, e); ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage)); ctx.close(); } @@ -826,14 +815,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onError(Throwable e) { - log.trace("[{}] Failed to process credentials: {}", ip, sha3Hash, e); + log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage)); ctx.close(); } }); } catch (Exception e) { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage)); - log.trace("[{}] X509 auth failure: {}", sessionId, ip, e); + log.trace("[{}] X509 auth failure: {}", sessionId, address, e); ctx.close(); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 76e2911793..9f0c89714a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -238,7 +238,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { public void release() { if (!msgQueue.isEmpty()) { - log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue. cleared", getDeviceId(), getMsgQueueSize()); + log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", getDeviceId(), msgQueue.size()); msgQueue.forEach(ReferenceCountUtil::safeRelease); msgQueue.clear(); }