diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index a7c4cb8c6c..5aea93528a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -762,7 +762,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF))); return; } - log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); + //TODO consume the rate limit + log.trace("[{}][{}] Processing subscription [{}]!", deviceSessionCtx.getTenantId(), sessionId, mqttMsg.variableHeader().messageId()); List grantedQoSList = new ArrayList<>(); boolean activityReported = false; for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { @@ -772,7 +773,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { registerSubQoS(topic, grantedQoSList, reqQoS); } else { - log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + log.debug("[{}][{}] Failed to subscribe because this session is provision only [{}][{}]", deviceSessionCtx.getTenantId(), sessionId, topic, reqQoS); grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); } activityReported = true; @@ -847,13 +848,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement registerSubQoS(topic, grantedQoSList, reqQoS); break; default: - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + //TODO increment an error counter if any exists + log.warn("[{}][{}] Failed to subscribe because topic is not supported [{}][{}]", deviceSessionCtx.getTenantId(), sessionId, topic, reqQoS); grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); break; } } } catch (Exception e) { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); + log.warn("[{}][{}] Failed to subscribe to [{}][{}]", deviceSessionCtx.getTenantId(), sessionId, topic, reqQoS, e); grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR)); } }