diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 185b7a82c3..091b45d3ec 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -51,6 +51,8 @@ import javax.security.cert.X509Certificate; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static io.netty.handler.codec.mqtt.MqttMessageType.*; @@ -79,6 +81,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private volatile InetSocketAddress address; private volatile GatewaySessionCtx gatewaySessionCtx; + private Map mqttQoSMap = new ConcurrentHashMap<>(); + public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { this.processor = processor; @@ -228,6 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement String topicName = subscription.topicName(); //TODO: handle this qos level. MqttQoS reqQoS = subscription.qualityOfService(); + mqttQoSMap.put(topicName, reqQoS); try { if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); @@ -244,6 +249,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { grantedQoSList.add(getMinSupportedQos(reqQoS)); + }else if (topicName.equals(GATEWAY_RPC_TOPIC)) { + grantedQoSList.add(getMinSupportedQos(reqQoS)); } else { log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); grantedQoSList.add(FAILURE.value()); @@ -262,6 +269,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); for (String topicName : mqttMsg.payload().topics()) { + mqttQoSMap.remove(topicName); try { if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);