From b96a7fcbd22aa11c6d1822f1f7d2bae36bd9fdd4 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 16 May 2024 18:45:50 +0200 Subject: [PATCH] added ackOrClose instead of chanel.close and refactoring --- .../AbstractGatewaySessionHandler.java | 54 ++++++++++++------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 49f2f1cc63..b4644ac3ac 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -32,7 +32,9 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttVersion; import jakarta.annotation.Nullable; import lombok.Getter; import lombok.Setter; @@ -83,6 +85,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; +import static org.thingsboard.server.transport.mqtt.util.ReturnCode.PAYLOAD_FORMAT_INVALID; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName; @@ -365,10 +368,7 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : json.getAsJsonObject().entrySet()) { if (!deviceEntry.getValue().isJsonArray()) { log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); @@ -402,7 +402,7 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : jsonObj.entrySet()) { + validateJsonObject(json); + for (Map.Entry deviceEntry : json.getAsJsonObject().entrySet()) { if (!deviceEntry.getValue().isJsonObject()) { log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); continue; @@ -500,6 +497,7 @@ public abstract class AbstractGatewaySessionHandler deviceEntry : jsonObj.entrySet()) { + validateJsonObject(json); + for (Map.Entry deviceEntry : json.getAsJsonObject().entrySet()) { if (!deviceEntry.getValue().isJsonObject()) { log.warn("{}[{}]", CAN_T_PARSE_VALUE, json); continue; @@ -611,6 +607,7 @@ public abstract class AbstractGatewaySessionHandler 0) { writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode)); } } + private void ackOrClose(int msgId) { + if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) { + ack(msgId, PAYLOAD_FORMAT_INVALID); + } else { + channel.close(); + } + } + private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) { if (this.deviceSessionCtx.isSparkplug()) { sendSparkplugStateOnTelemetry(deviceSessionCtx.getSessionInfo(),