179 lines
8.9 KiB
Python
179 lines
8.9 KiB
Python
|
|
'''配置文件解析模块'''
|
|||
|
|
|
|||
|
|
import ast
|
|||
|
|
import json
|
|||
|
|
from typing import Dict, Tuple, List, Any
|
|||
|
|
from models import ConfigField
|
|||
|
|
from utils import get_field_type_from_value
|
|||
|
|
|
|||
|
|
class ConfigParser:
|
|||
|
|
"""配置文件解析器"""
|
|||
|
|
|
|||
|
|
def __init__(self, rules: Dict):
|
|||
|
|
self.rules = rules
|
|||
|
|
|
|||
|
|
def parse_config_file(self, config_file_path: str) -> Tuple[Dict, Dict[str, ConfigField], str]:
|
|||
|
|
"""解析配置文件,精确匹配每个配置项的位置和注释"""
|
|||
|
|
try:
|
|||
|
|
with open(config_file_path, 'r', encoding='utf-8') as f:
|
|||
|
|
content = f.read()
|
|||
|
|
lines = content.split('\n')
|
|||
|
|
|
|||
|
|
tree = ast.parse(content)
|
|||
|
|
config_data = {}
|
|||
|
|
all_config_fields = {}
|
|||
|
|
|
|||
|
|
# 第一遍:识别所有配置项及其位置
|
|||
|
|
for node in ast.walk(tree):
|
|||
|
|
if isinstance(node, ast.Assign):
|
|||
|
|
for target in node.targets:
|
|||
|
|
if isinstance(target, ast.Name) and target.id.isupper():
|
|||
|
|
var_name = target.id
|
|||
|
|
|
|||
|
|
# 获取配置项在文件中的位置
|
|||
|
|
assignment_line = node.lineno - 1 # ast行号从1开始,我们使用0基索引
|
|||
|
|
|
|||
|
|
# 向上查找配置项上面的注释行
|
|||
|
|
comment_lines = []
|
|||
|
|
current_line = assignment_line - 1
|
|||
|
|
while current_line >= 0:
|
|||
|
|
line = lines[current_line].strip()
|
|||
|
|
# 如果是注释行或空行,则包含在注释中
|
|||
|
|
if line.startswith('#') or line == '':
|
|||
|
|
comment_lines.insert(0, lines[current_line])
|
|||
|
|
current_line -= 1
|
|||
|
|
# 如果是多行注释的开始
|
|||
|
|
elif line.startswith("'''") or line.startswith('"""'):
|
|||
|
|
# 找到多行注释的起始位置
|
|||
|
|
comment_start = current_line
|
|||
|
|
while comment_start >= 0 and not (lines[comment_start].strip().endswith("'''") or lines[comment_start].strip().endswith('"""')):
|
|||
|
|
comment_start -= 1
|
|||
|
|
if comment_start >= 0:
|
|||
|
|
for i in range(comment_start, current_line + 1):
|
|||
|
|
comment_lines.insert(0, lines[i])
|
|||
|
|
current_line = comment_start - 1
|
|||
|
|
else:
|
|||
|
|
break
|
|||
|
|
else:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 完整的配置项块(包括注释和赋值行)
|
|||
|
|
full_block = comment_lines + [lines[assignment_line]]
|
|||
|
|
|
|||
|
|
# 如果赋值行有后续行(如多行字符串),添加它们
|
|||
|
|
end_line = getattr(node, 'end_lineno', node.lineno) - 1
|
|||
|
|
if end_line > assignment_line:
|
|||
|
|
for i in range(assignment_line + 1, end_line + 1):
|
|||
|
|
full_block.append(lines[i])
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 获取配置项的值
|
|||
|
|
var_value = eval(compile(ast.Expression(node.value), '<string>', 'eval'))
|
|||
|
|
config_data[var_name] = var_value
|
|||
|
|
|
|||
|
|
# 创建ConfigField实例
|
|||
|
|
config_field = self._create_config_field(
|
|||
|
|
var_name, var_value, assignment_line,
|
|||
|
|
full_block, comment_lines
|
|||
|
|
)
|
|||
|
|
all_config_fields[var_name] = config_field
|
|||
|
|
|
|||
|
|
except:
|
|||
|
|
# 如果无法解析值,使用字符串表示
|
|||
|
|
try:
|
|||
|
|
value_str = ast.get_source_segment(content, node.value)
|
|||
|
|
config_data[var_name] = value_str
|
|||
|
|
|
|||
|
|
config_field = self._create_config_field(
|
|||
|
|
var_name, value_str, assignment_line,
|
|||
|
|
full_block, comment_lines, is_string=True
|
|||
|
|
)
|
|||
|
|
all_config_fields[var_name] = config_field
|
|||
|
|
|
|||
|
|
except:
|
|||
|
|
config_data[var_name] = "无法解析的值"
|
|||
|
|
|
|||
|
|
config_field = self._create_config_field(
|
|||
|
|
var_name, "无法解析的值", assignment_line,
|
|||
|
|
full_block, comment_lines, is_string=True
|
|||
|
|
)
|
|||
|
|
all_config_fields[var_name] = config_field
|
|||
|
|
|
|||
|
|
return config_data, all_config_fields, content
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
raise Exception(f"解析配置文件失败:{str(e)}")
|
|||
|
|
|
|||
|
|
def _create_config_field(self, var_name: str, var_value: Any, assignment_line: int,
|
|||
|
|
full_block: List[str], comment_lines: List[str],
|
|||
|
|
is_string: bool = False) -> ConfigField:
|
|||
|
|
"""创建ConfigField实例"""
|
|||
|
|
# 获取字段类型
|
|||
|
|
field_type = self._get_field_type(var_name, var_value)
|
|||
|
|
|
|||
|
|
# 获取小数位数
|
|||
|
|
decimals = self._get_field_decimals(var_name) if field_type == "float" else None
|
|||
|
|
|
|||
|
|
# 获取上次保存的值
|
|||
|
|
last_saved_value = self._get_last_saved_value(var_name, var_value)
|
|||
|
|
|
|||
|
|
return ConfigField(
|
|||
|
|
name=var_name,
|
|||
|
|
value=var_value,
|
|||
|
|
category=self._categorize_field(var_name),
|
|||
|
|
display_name=self._get_display_name(var_name),
|
|||
|
|
field_type=field_type,
|
|||
|
|
decimals=decimals,
|
|||
|
|
tooltip=self._get_tooltip(var_name),
|
|||
|
|
hidden=self._is_hidden(var_name),
|
|||
|
|
encrypted=self._is_encrypted(var_name), # 加密状态
|
|||
|
|
line_number=assignment_line - len(comment_lines),
|
|||
|
|
original_lines=full_block,
|
|||
|
|
validation=self._get_validation(var_name),
|
|||
|
|
last_saved_value=last_saved_value
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def _categorize_field(self, field_name: str) -> str:
|
|||
|
|
"""为字段分类"""
|
|||
|
|
for category, fields in self.rules.get("categories", {}).items():
|
|||
|
|
if field_name in fields:
|
|||
|
|
return category
|
|||
|
|
return "未分类"
|
|||
|
|
|
|||
|
|
def _get_display_name(self, field_name: str) -> str:
|
|||
|
|
"""获取字段的显示名称"""
|
|||
|
|
return self.rules.get("display_names", {}).get(field_name, field_name)
|
|||
|
|
|
|||
|
|
def _get_tooltip(self, field_name: str) -> str:
|
|||
|
|
"""获取字段的提示信息"""
|
|||
|
|
return self.rules.get("tooltips", {}).get(field_name, f"配置项: {field_name}")
|
|||
|
|
|
|||
|
|
def _get_field_type(self, field_name: str, value: Any) -> str:
|
|||
|
|
"""获取字段类型"""
|
|||
|
|
if field_name in self.rules.get("field_types", {}):
|
|||
|
|
return self.rules["field_types"][field_name]
|
|||
|
|
|
|||
|
|
return get_field_type_from_value(value)
|
|||
|
|
|
|||
|
|
def _get_field_decimals(self, field_name: str) -> int:
|
|||
|
|
"""获取字段的小数位数"""
|
|||
|
|
return self.rules.get("field_decimals", {}).get(field_name, 6)
|
|||
|
|
|
|||
|
|
def _get_validation(self, field_name: str) -> Dict:
|
|||
|
|
"""获取配置项的校验规则"""
|
|||
|
|
return self.rules.get("validations", {}).get(field_name, {})
|
|||
|
|
|
|||
|
|
def _is_hidden(self, field_name: str) -> bool:
|
|||
|
|
"""检查配置项是否被标记为隐藏"""
|
|||
|
|
return field_name in self.rules.get("hidden", [])
|
|||
|
|
|
|||
|
|
def _is_encrypted(self, field_name: str) -> bool:
|
|||
|
|
"""检查配置项是否被标记为加密"""
|
|||
|
|
return field_name in self.rules.get("encrypted_fields", [])
|
|||
|
|
|
|||
|
|
def _get_last_saved_value(self, field_name: str, current_value: Any) -> Any:
|
|||
|
|
"""获取配置项的上次保存值"""
|
|||
|
|
last_saved_values = self.rules.get("last_saved_values", {})
|
|||
|
|
if field_name in last_saved_values:
|
|||
|
|
return last_saved_values[field_name]
|
|||
|
|
return None
|