fixed device session closing: should close every device session if message id wrong

This commit is contained in:
dashevchenko 2025-10-07 12:22:57 +03:00
parent 00700afd83
commit 6cfa3f1b0b

View File

@ -845,27 +845,31 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgId); log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgId);
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
} }
} }
if (msgId <= 0) {
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
}
} }
@Override @Override
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e); log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e);
if (ackSent.compareAndSet(false, true)) { if (e instanceof TbRateLimitsException) {
if (e instanceof TbRateLimitsException) { if (ackSent.compareAndSet(false, true)) {
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.QUOTA_EXCEEDED.byteValue())); deviceSessionCtx, msgId, MqttReasonCodes.PubAck.QUOTA_EXCEEDED.byteValue()));
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); ctx.close();
} else { }
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH);
} else {
if (ackSent.compareAndSet(false, true)) {
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg( ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(
deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR); ctx.close();
} }
ctx.close(); closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR);
} }
remaining.set(0);
} }
}; };
} }