diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index ac1f07c680..d92b04e584 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -34,6 +34,7 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.handler.ssl.SslHandler; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; @@ -112,10 +113,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.trace("[{}] Processing msg: {}", sessionId, msg); - if (msg instanceof MqttMessage) { - processMqttMsg(ctx, (MqttMessage) msg); - } else { - ctx.close(); + try { + if (msg instanceof MqttMessage) { + processMqttMsg(ctx, (MqttMessage) msg); + } else { + ctx.close(); + } + } finally { + ReferenceCountUtil.safeRelease(msg); } } @@ -421,6 +426,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("[{}] Unexpected Exception", sessionId, cause); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index db2c2193bc..990711a2f8 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -212,18 +212,14 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } private static String validatePayload(UUID sessionId, ByteBuf payloadData, boolean isEmptyPayloadAllowed) throws AdaptorException { - try { - String payload = payloadData.toString(UTF8); - if (payload == null) { - log.warn("[{}] Payload is empty!", sessionId); - if (!isEmptyPayloadAllowed) { - throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); - } + String payload = payloadData.toString(UTF8); + if (payload == null) { + log.warn("[{}] Payload is empty!", sessionId); + if (!isEmptyPayloadAllowed) { + throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); } - return payload; - } finally { - payloadData.release(); } + return payload; } }