Merge pull request #12920 from smatvienko-tb/hotfix/mqtt-transport-handler-subscription-logging-with-tenantid

MQTT transport handler - log tenant on subscribe issue
This commit is contained in:
Viacheslav Klimov 2025-03-21 12:00:15 +02:00 committed by GitHub
commit 63697efd44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -762,7 +762,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF)));
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
//TODO consume the rate limit
log.trace("[{}][{}] Processing subscription [{}]!", deviceSessionCtx.getTenantId(), sessionId, mqttMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
boolean activityReported = false;
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
@ -772,7 +773,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) {
registerSubQoS(topic, grantedQoSList, reqQoS);
} else {
log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
log.debug("[{}][{}] Failed to subscribe because this session is provision only [{}][{}]", deviceSessionCtx.getTenantId(), sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
}
activityReported = true;
@ -847,13 +848,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
//TODO increment an error counter if any exists
log.warn("[{}][{}] Failed to subscribe because topic is not supported [{}][{}]", deviceSessionCtx.getTenantId(), sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
break;
}
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
log.warn("[{}][{}] Failed to subscribe to [{}][{}]", deviceSessionCtx.getTenantId(), sessionId, topic, reqQoS, e);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR));
}
}