Revert "MqttTransportHandler refactored to not reference on InetSocketAddress instances to free some heap space. IPv4 are stored and logged as int."

This reverts commit 0ef9d5753cd271fd7c8729567b5a2765943538d1.
This commit is contained in:
Sergey Matvienko 2022-01-14 11:50:36 +02:00
parent 565b6284d2
commit 1507ac69ab
2 changed files with 8 additions and 19 deletions

View File

@ -75,7 +75,6 @@ import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.Certificate; import java.security.cert.Certificate;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.ArrayList; import java.util.ArrayList;
@ -113,6 +112,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE";
private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
@ -125,8 +125,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
final DeviceSessionCtx deviceSessionCtx; final DeviceSessionCtx deviceSessionCtx;
volatile int ip = 0; volatile InetSocketAddress address;
volatile int port = 0;
volatile GatewaySessionHandler gatewaySessionHandler; volatile GatewaySessionHandler gatewaySessionHandler;
private final ConcurrentHashMap<String, String> otaPackSessions; private final ConcurrentHashMap<String, String> otaPackSessions;
@ -185,14 +184,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
if (port == 0) { address = getAddress(ctx);
InetSocketAddress address = getAddress(ctx);
ip = getIpv4(address); //ipv6 will not appear in logs
port = address.getPort();
}
if (msg.fixedHeader() == null) { if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", ip, port); log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
ctx.close(); ctx.close();
return; return;
} }
@ -206,11 +200,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
int getIpv4(InetSocketAddress address) {
byte[] ipBytes = address.getAddress().getAddress();
return ipBytes.length == 4 ? ByteBuffer.wrap(ipBytes).getInt() : -1;
}
InetSocketAddress getAddress(ChannelHandlerContext ctx) { InetSocketAddress getAddress(ChannelHandlerContext ctx) {
return (InetSocketAddress) ctx.channel().remoteAddress(); return (InetSocketAddress) ctx.channel().remoteAddress();
} }
@ -803,7 +792,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override @Override
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", ip, userName, e); log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
ctx.close(); ctx.close();
} }
@ -826,14 +815,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override @Override
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", ip, sha3Hash, e); log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
ctx.close(); ctx.close();
} }
}); });
} catch (Exception e) { } catch (Exception e) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
log.trace("[{}] X509 auth failure: {}", sessionId, ip, e); log.trace("[{}] X509 auth failure: {}", sessionId, address, e);
ctx.close(); ctx.close();
} }
} }

View File

@ -238,7 +238,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
public void release() { public void release() {
if (!msgQueue.isEmpty()) { if (!msgQueue.isEmpty()) {
log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue. cleared", getDeviceId(), getMsgQueueSize()); log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", getDeviceId(), msgQueue.size());
msgQueue.forEach(ReferenceCountUtil::safeRelease); msgQueue.forEach(ReferenceCountUtil::safeRelease);
msgQueue.clear(); msgQueue.clear();
} }