'''配置文件解析模块''' import ast import json from typing import Dict, Tuple, List, Any from models import ConfigField from utils import get_field_type_from_value class ConfigParser: """配置文件解析器""" def __init__(self, rules: Dict): self.rules = rules def parse_config_file(self, config_file_path: str) -> Tuple[Dict, Dict[str, ConfigField], str]: """解析配置文件,精确匹配每个配置项的位置和注释""" try: with open(config_file_path, 'r', encoding='utf-8') as f: content = f.read() lines = content.split('\n') tree = ast.parse(content) config_data = {} all_config_fields = {} # 第一遍:识别所有配置项及其位置 for node in ast.walk(tree): if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name) and target.id.isupper(): var_name = target.id # 获取配置项在文件中的位置 assignment_line = node.lineno - 1 # ast行号从1开始,我们使用0基索引 # 向上查找配置项上面的注释行 comment_lines = [] current_line = assignment_line - 1 while current_line >= 0: line = lines[current_line].strip() # 如果是注释行或空行,则包含在注释中 if line.startswith('#') or line == '': comment_lines.insert(0, lines[current_line]) current_line -= 1 # 如果是多行注释的开始 elif line.startswith("'''") or line.startswith('"""'): # 找到多行注释的起始位置 comment_start = current_line while comment_start >= 0 and not (lines[comment_start].strip().endswith("'''") or lines[comment_start].strip().endswith('"""')): comment_start -= 1 if comment_start >= 0: for i in range(comment_start, current_line + 1): comment_lines.insert(0, lines[i]) current_line = comment_start - 1 else: break else: break # 完整的配置项块(包括注释和赋值行) full_block = comment_lines + [lines[assignment_line]] # 如果赋值行有后续行(如多行字符串),添加它们 end_line = getattr(node, 'end_lineno', node.lineno) - 1 if end_line > assignment_line: for i in range(assignment_line + 1, end_line + 1): full_block.append(lines[i]) try: # 获取配置项的值 var_value = eval(compile(ast.Expression(node.value), '', 'eval')) config_data[var_name] = var_value # 创建ConfigField实例 config_field = self._create_config_field( var_name, var_value, assignment_line, full_block, comment_lines ) all_config_fields[var_name] = config_field except: # 如果无法解析值,使用字符串表示 try: value_str = ast.get_source_segment(content, node.value) config_data[var_name] = value_str config_field = self._create_config_field( var_name, value_str, assignment_line, full_block, comment_lines, is_string=True ) all_config_fields[var_name] = config_field except: config_data[var_name] = "无法解析的值" config_field = self._create_config_field( var_name, "无法解析的值", assignment_line, full_block, comment_lines, is_string=True ) all_config_fields[var_name] = config_field return config_data, all_config_fields, content except Exception as e: raise Exception(f"解析配置文件失败:{str(e)}") def _create_config_field(self, var_name: str, var_value: Any, assignment_line: int, full_block: List[str], comment_lines: List[str], is_string: bool = False) -> ConfigField: """创建ConfigField实例""" # 获取字段类型 field_type = self._get_field_type(var_name, var_value) # 获取小数位数 decimals = self._get_field_decimals(var_name) if field_type == "float" else None # 获取上次保存的值 last_saved_value = self._get_last_saved_value(var_name, var_value) return ConfigField( name=var_name, value=var_value, category=self._categorize_field(var_name), display_name=self._get_display_name(var_name), field_type=field_type, decimals=decimals, tooltip=self._get_tooltip(var_name), hidden=self._is_hidden(var_name), encrypted=self._is_encrypted(var_name), # 加密状态 line_number=assignment_line - len(comment_lines), original_lines=full_block, validation=self._get_validation(var_name), last_saved_value=last_saved_value ) def _categorize_field(self, field_name: str) -> str: """为字段分类""" for category, fields in self.rules.get("categories", {}).items(): if field_name in fields: return category return "未分类" def _get_display_name(self, field_name: str) -> str: """获取字段的显示名称""" return self.rules.get("display_names", {}).get(field_name, field_name) def _get_tooltip(self, field_name: str) -> str: """获取字段的提示信息""" return self.rules.get("tooltips", {}).get(field_name, f"配置项: {field_name}") def _get_field_type(self, field_name: str, value: Any) -> str: """获取字段类型""" if field_name in self.rules.get("field_types", {}): return self.rules["field_types"][field_name] return get_field_type_from_value(value) def _get_field_decimals(self, field_name: str) -> int: """获取字段的小数位数""" return self.rules.get("field_decimals", {}).get(field_name, 6) def _get_validation(self, field_name: str) -> Dict: """获取配置项的校验规则""" return self.rules.get("validations", {}).get(field_name, {}) def _is_hidden(self, field_name: str) -> bool: """检查配置项是否被标记为隐藏""" return field_name in self.rules.get("hidden", []) def _is_encrypted(self, field_name: str) -> bool: """检查配置项是否被标记为加密""" return field_name in self.rules.get("encrypted_fields", []) def _get_last_saved_value(self, field_name: str, current_value: Any) -> Any: """获取配置项的上次保存值""" last_saved_values = self.rules.get("last_saved_values", {}) if field_name in last_saved_values: return last_saved_values[field_name] return None