#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import List, Dict, Any, Optional, Tuple, Union from dataclasses import dataclass, field from datetime import datetime import re import json from copy import deepcopy @dataclass class Cell: """单元格数据结构""" text: str = "" # 单元格文本内容 row_span: int = 1 # 垂直合并行数 col_span: int = 1 # 水平合并列数 is_header: bool = False # 是否是表头单元格 data_type: str = "text" # 数据类型:text, number, date, currency等 original_value: Any = None # 原始值 formatted_value: str = "" # 格式化后的值 position: Dict[str, int] = field(default_factory=lambda: {"row": 0, "col": 0}) # 单元格位置 metadata: Dict[str, Any] = field(default_factory=dict) # 元数据 @dataclass class Row: """行数据结构""" cells: List[Cell] = field(default_factory=list) # 单元格列表 is_header: bool = False # 是否是表头行 row_index: int = 0 # 行索引 metadata: Dict[str, Any] = field(default_factory=dict) # 元数据 @dataclass class Table: """表格数据结构""" rows: List[Row] = field(default_factory=list) # 行列表 header_rows: int = 0 # 表头行数 total_rows: int = 0 # 总行数 total_cols: int = 0 # 总列数 has_complex_header: bool = False # 是否有复杂表头 table_type: str = "normal" # 表格类型:normal, key_value, matrix等 metadata: Dict[str, Any] = field(default_factory=dict) # 元数据 @dataclass class TableData: """表格数据结构""" rows: List[List[Dict[str, Any]]] = field(default_factory=list) # 存储表格行数据 style: Optional[str] = None # 表格样式 columns: List[Dict[str, Any]] = field(default_factory=list) # 列属性 has_multi_level_header: bool = False # 是否有多级表头 has_key_value_pairs: bool = False # 是否包含键值对结构 header_rows: int = 1 # 表头行数,默认为1 table_type: str = "normal" # 表格类型:normal, key_value, matrix等 def add_row(self, row_data: List[Dict[str, Any]]): """添加一行数据到表格""" self.rows.append(row_data) def get_row_count(self) -> int: """获取表格行数""" return len(self.rows) def get_column_count(self) -> int: """获取表格列数""" return len(self.columns) if self.columns else ( max((len(row) for row in self.rows), default=0) ) def is_empty(self) -> bool: """检查表格是否为空""" return len(self.rows) == 0 def get_cell_text(self, row_idx: int, col_idx: int) -> str: """获取单元格文本内容""" try: if 0 <= row_idx < len(self.rows) and 0 <= col_idx < len(self.rows[row_idx]): cell = self.rows[row_idx][col_idx] return cell.get('text', '').strip() except Exception as e: print(f"获取单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}") return '' def set_cell_text(self, row_idx: int, col_idx: int, text: str): """设置单元格文本内容""" try: if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): self.rows[row_idx][col_idx]['text'] = text except Exception as e: print(f"设置单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}") def get_cell_merge_info(self, row_idx: int, col_idx: int) -> Dict[str, Any]: """获取单元格合并信息""" try: if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): cell = self.rows[row_idx][col_idx] return { 'gridspan': cell.get('gridspan', 1), 'vmerge': cell.get('vmerge', None) } except Exception as e: print(f"获取单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}") return {'gridspan': 1, 'vmerge': None} def set_cell_merge_info(self, row_idx: int, col_idx: int, gridspan: int = 1, vmerge: Optional[str] = None): """设置单元格合并信息""" try: if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): cell = self.rows[row_idx][col_idx] cell['gridspan'] = gridspan if vmerge is not None: cell['vmerge'] = vmerge except Exception as e: print(f"设置单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}") def is_merged_cell(self, row_idx: int, col_idx: int) -> bool: """检查单元格是否是合并单元格""" try: if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): cell = self.rows[row_idx][col_idx] return cell.get('gridspan', 1) > 1 or cell.get('vmerge') is not None except Exception as e: print(f"检查单元格合并状态时出错 [{row_idx},{col_idx}]: {str(e)}") return False def get_header_rows(self) -> List[List[Dict[str, Any]]]: """获取表头行数据""" return self.rows[:self.header_rows] def get_data_rows(self) -> List[List[Dict[str, Any]]]: """获取数据行数据""" return self.rows[self.header_rows:] class TableProcessor: """增强的表格处理器""" def __init__(self): # 数据类型识别模式 self.patterns = { 'currency': r'^\s*¥?\s*\d+(\.\d{2})?\s*$', # 货币金额 'percentage': r'^\s*\d+(\.\d+)?%\s*$', # 百分比 'date': r'^\d{4}[-/年]\d{1,2}[-/月]\d{1,2}日?$', # 日期 'number': r'^\s*\d+(\.\d+)?\s*$', # 数字 'time': r'^\d{1,2}:\d{2}(:\d{2})?$' # 时间 } # 表头关键词 self.header_keywords = [ '序号', '编号', '项目', '名称', '类型', '说明', '备注', '金额', '时间', '日期', '地区', '部门', '人员' ] def process_table(self, raw_table: Any) -> Table: """处理表格,返回标准化的表格对象""" try: # 1. 初始化表格对象 table = Table() # 2. 分析表格结构 self._analyze_table_structure(raw_table, table) # 3. 处理表头 self._process_headers(raw_table, table) # 4. 处理数据行 self._process_data_rows(raw_table, table) # 5. 规范化表格 self._normalize_table(table) # 6. 识别表格类型 self._identify_table_type(table) return table except Exception as e: print(f"处理表格时出错: {str(e)}") return Table() def _analyze_table_structure(self, raw_table: Any, table: Table): """分析表格结构,包括行数、列数、合并单元格等""" try: # 获取基本维度信息 rows = raw_table.rows table.total_rows = len(rows) table.total_cols = len(raw_table.columns) # 分析表头结构 header_info = self._analyze_header_structure(raw_table) table.header_rows = header_info['header_rows'] table.has_complex_header = header_info['is_complex'] # 记录结构信息到元数据 table.metadata['structure_info'] = { 'total_rows': table.total_rows, 'total_cols': table.total_cols, 'header_rows': table.header_rows, 'has_complex_header': table.has_complex_header, 'analyzed_at': datetime.now().isoformat() } except Exception as e: print(f"分析表格结构时出错: {str(e)}") def _analyze_header_structure(self, raw_table: Any) -> Dict[str, Any]: """分析表头结构,返回表头信息""" header_info = { 'header_rows': 1, 'is_complex': False } try: # 检查前三行 for i in range(min(3, len(raw_table.rows))): row = raw_table.rows[i] # 检查是否有合并单元格 has_merged_cells = any( cell._element.find('.//{*}vMerge') is not None or cell._element.find('.//{*}gridSpan') is not None for cell in row.cells ) # 检查是否包含表头关键词 has_header_keywords = any( any(keyword in cell.text for keyword in self.header_keywords) for cell in row.cells ) if has_merged_cells or has_header_keywords: header_info['header_rows'] = max(header_info['header_rows'], i + 1) if has_merged_cells: header_info['is_complex'] = True # 检查单元格格式是否符合表头特征 cell_formats = [self._analyze_cell_format(cell) for cell in row.cells] if any(fmt == 'header' for fmt in cell_formats): header_info['header_rows'] = max(header_info['header_rows'], i + 1) except Exception as e: print(f"分析表头结构时出错: {str(e)}") return header_info def _analyze_cell_format(self, cell: Any) -> str: """分析单元格格式特征""" try: # 获取单元格文本 text = cell.text.strip() # 检查是否是表头格式 if text and any(char.isupper() for char in text): # 包含大写字母 return 'header' if text and any(keyword in text for keyword in self.header_keywords): return 'header' # 检查数据类型 for data_type, pattern in self.patterns.items(): if re.match(pattern, text): return data_type return 'text' except Exception as e: print(f"分析单元格格式时出错: {str(e)}") return 'text' def _process_headers(self, raw_table: Any, table: Table): """处理表头,包括多级表头的处理""" try: for i in range(min(table.header_rows, len(raw_table.rows))): try: row = raw_table.rows[i] header_row = Row(is_header=True, row_index=i) # 处理每个表头单元格 col_index = 0 max_cols = len(row.cells) # 获取实际的列数 for cell_idx in range(max_cols): try: cell = row.cells[cell_idx] header_cell = self._process_header_cell(cell, i, col_index) header_row.cells.append(header_cell) col_index += header_cell.col_span except Exception as cell_error: print(f"处理表头单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}") # 添加一个空单元格 header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index}) header_row.cells.append(header_cell) col_index += 1 # 如果单元格数量不足,补充空单元格 while len(header_row.cells) < table.total_cols: header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index}) header_row.cells.append(header_cell) col_index += 1 table.rows.append(header_row) except Exception as row_error: print(f"处理表头行时出错 [行={i}]: {str(row_error)}") # 创建一个空行 empty_row = Row(is_header=True, row_index=i) for col in range(table.total_cols): empty_row.cells.append(Cell(text="", is_header=True, position={'row': i, 'col': col})) table.rows.append(empty_row) except Exception as e: print(f"处理表头时出错: {str(e)}") def _process_header_cell(self, cell: Any, row_index: int, col_index: int) -> Cell: """处理表头单元格""" try: # 创建表头单元格 header_cell = Cell( text=cell.text.strip(), is_header=True, position={'row': row_index, 'col': col_index} ) # 处理合并单元格 vmerge = cell._element.find('.//{*}vMerge') gridspan = cell._element.find('.//{*}gridSpan') if vmerge is not None: val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue') header_cell.row_span = 2 if val == 'restart' else 1 if gridspan is not None: try: header_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1')) except ValueError: header_cell.col_span = 1 return header_cell except Exception as e: print(f"处理表头单元格时出错: {str(e)}") return Cell(text="", is_header=True, position={'row': row_index, 'col': col_index}) def _process_data_rows(self, raw_table: Any, table: Table): """处理数据行""" try: for i in range(table.header_rows, table.total_rows): try: row = raw_table.rows[i] data_row = Row(is_header=False, row_index=i) # 处理每个数据单元格 col_index = 0 max_cols = len(row.cells) # 获取实际的列数 for cell_idx in range(max_cols): try: cell = row.cells[cell_idx] data_cell = self._process_data_cell(cell, i, col_index) data_row.cells.append(data_cell) col_index += data_cell.col_span except Exception as cell_error: print(f"处理单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}") # 添加一个空单元格 data_cell = Cell(text="", position={'row': i, 'col': col_index}) data_row.cells.append(data_cell) col_index += 1 # 如果单元格数量不足,补充空单元格 while len(data_row.cells) < table.total_cols: data_cell = Cell(text="", position={'row': i, 'col': col_index}) data_row.cells.append(data_cell) col_index += 1 table.rows.append(data_row) except Exception as row_error: print(f"处理数据行时出错 [行={i}]: {str(row_error)}") # 创建一个空行 empty_row = Row(is_header=False, row_index=i) for col in range(table.total_cols): empty_row.cells.append(Cell(text="", position={'row': i, 'col': col})) table.rows.append(empty_row) except Exception as e: print(f"处理数据行时出错: {str(e)}") def _process_data_cell(self, cell: Any, row_index: int, col_index: int) -> Cell: """处理数据单元格""" try: # 获取单元格文本 text = cell.text.strip() # 创建数据单元格 data_cell = Cell( text=text, position={'row': row_index, 'col': col_index} ) # 识别数据类型 data_type = 'text' original_value = text formatted_value = text # 尝试识别数据类型和格式化值 for type_name, pattern in self.patterns.items(): if re.match(pattern, text): data_type = type_name if type_name == 'currency': # 处理货币金额 try: value = float(re.sub(r'[¥,\s]', '', text)) original_value = value formatted_value = f"¥{value:.2f}" except ValueError: pass elif type_name == 'percentage': # 处理百分比 try: value = float(text.rstrip('%')) / 100 original_value = value formatted_value = f"{value:.2%}" except ValueError: pass elif type_name == 'date': # 处理日期 try: # 统一日期格式 date_text = re.sub(r'[年月日]', '-', text).rstrip('-') date_obj = datetime.strptime(date_text, '%Y-%m-%d') original_value = date_obj formatted_value = date_obj.strftime('%Y-%m-%d') except ValueError: pass break data_cell.data_type = data_type data_cell.original_value = original_value data_cell.formatted_value = formatted_value # 处理合并单元格 vmerge = cell._element.find('.//{*}vMerge') gridspan = cell._element.find('.//{*}gridSpan') if vmerge is not None: val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue') data_cell.row_span = 2 if val == 'restart' else 1 if gridspan is not None: try: data_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1')) except ValueError: data_cell.col_span = 1 return data_cell except Exception as e: print(f"处理数据单元格时出错: {str(e)}") return Cell(text="", position={'row': row_index, 'col': col_index}) def _normalize_table(self, table: Table): """规范化表格,确保所有行都有相同的列数""" try: max_cols = table.total_cols # 确保每行都有正确的列数 for row in table.rows: current_cols = len(row.cells) if current_cols < max_cols: # 添加空单元格 for col in range(current_cols, max_cols): empty_cell = Cell( text="", is_header=row.is_header, position={'row': row.row_index, 'col': col} ) row.cells.append(empty_cell) elif current_cols > max_cols: # 移除多余的单元格 row.cells = row.cells[:max_cols] # 更新表格的总列数 table.total_cols = max_cols except Exception as e: print(f"规范化表格时出错: {str(e)}") def _identify_table_type(self, table: Table): """识别表格类型""" try: # 检查是否是键值对表格 if table.total_cols == 2: key_col_pattern = all( cell.text.strip() != "" for row in table.rows if not row.is_header for cell in row.cells[:1] ) if key_col_pattern: table.table_type = "key_value" return # 检查是否是矩阵表格 if table.has_complex_header and table.total_cols > 2: table.table_type = "matrix" return # 默认为普通表格 table.table_type = "normal" except Exception as e: print(f"识别表格类型时出错: {str(e)}") table.table_type = "normal" def convert_to_markdown(self, table: Table) -> str: """将表格转换为Markdown格式""" try: markdown_lines = [] # 处理表头 for i in range(table.header_rows): row = table.rows[i] header_cells = [cell.text for cell in row.cells] markdown_lines.append("| " + " | ".join(header_cells) + " |") # 添加分隔行 if i == table.header_rows - 1: separator = "|" + "|".join(["---" for _ in range(table.total_cols)]) + "|" markdown_lines.append(separator) # 处理数据行 for row in table.rows[table.header_rows:]: data_cells = [ cell.formatted_value if cell.formatted_value else cell.text for cell in row.cells ] markdown_lines.append("| " + " | ".join(data_cells) + " |") return "\n".join(markdown_lines) except Exception as e: print(f"转换为Markdown格式时出错: {str(e)}") return "" def convert_to_html(self, table: Table) -> str: """将表格转换为HTML格式""" try: html_lines = [''] # 处理表头 if table.header_rows > 0: html_lines.append("") for i in range(table.header_rows): row = table.rows[i] html_lines.append("") for cell in row.cells: span_attrs = [] if cell.row_span > 1: span_attrs.append(f'rowspan="{cell.row_span}"') if cell.col_span > 1: span_attrs.append(f'colspan="{cell.col_span}"') attrs = " ".join(span_attrs) html_lines.append(f"") html_lines.append("") html_lines.append("") # 处理数据行 html_lines.append("") for row in table.rows[table.header_rows:]: html_lines.append("") for cell in row.cells: span_attrs = [] if cell.row_span > 1: span_attrs.append(f'rowspan="{cell.row_span}"') if cell.col_span > 1: span_attrs.append(f'colspan="{cell.col_span}"') attrs = " ".join(span_attrs) # 使用格式化值或原始文本 display_value = cell.formatted_value if cell.formatted_value else cell.text html_lines.append(f"") html_lines.append("") html_lines.append("") html_lines.append("
{cell.text}
{display_value}
") return "\n".join(html_lines) except Exception as e: print(f"转换为HTML格式时出错: {str(e)}") return "" def convert_to_dict(self, table: Table) -> Dict[str, Any]: """将表格转换为字典格式""" try: result = { 'metadata': table.metadata, 'structure': { 'total_rows': table.total_rows, 'total_cols': table.total_cols, 'header_rows': table.header_rows, 'has_complex_header': table.has_complex_header, 'table_type': table.table_type }, 'headers': [], 'data': [] } # 处理表头 for i in range(table.header_rows): header_row = [] for cell in table.rows[i].cells: header_row.append({ 'text': cell.text, 'row_span': cell.row_span, 'col_span': cell.col_span, 'position': cell.position }) result['headers'].append(header_row) # 处理数据行 for row in table.rows[table.header_rows:]: data_row = [] for cell in row.cells: data_row.append({ 'text': cell.text, 'data_type': cell.data_type, 'original_value': cell.original_value, 'formatted_value': cell.formatted_value, 'position': cell.position }) result['data'].append(data_row) return result except Exception as e: print(f"转换为字典格式时出错: {str(e)}") return {} def convert_to_text(self, table: Table) -> str: """ 将表格转换为文本格式,以"标题:内容"的形式显示,多级表头用下划线连接 Args: table: Table对象 Returns: str: 表格的文本表示 """ if not table or not table.rows: return "【空表格】" try: # 存储处理后的文本行 text_parts = [] # 存储处理后的表头文本 header_texts = {} # 处理表头 if table.header_rows > 0: # 对于多级表头,需要合并处理 for row_idx in range(table.header_rows): row = table.rows[row_idx] for col_idx, cell in enumerate(row.cells): # 获取当前列的已有表头文本 current_header = header_texts.get(col_idx, []) if cell.text.strip(): current_header.append(cell.text.strip()) header_texts[col_idx] = current_header # 合并多级表头 final_headers = {} for col_idx, headers in header_texts.items(): final_headers[col_idx] = "_".join(headers) if headers else "" # 处理数据行 data_rows = [] for row in table.rows[table.header_rows:]: row_data = {} for col_idx, cell in enumerate(row.cells): if cell.text.strip(): row_data[col_idx] = cell.text.strip() if row_data: data_rows.append(row_data) # 生成"标题:内容"格式输出 for row_idx, row_data in enumerate(data_rows): row_parts = [] for col_idx, content in row_data.items(): if col_idx in final_headers and final_headers[col_idx]: row_parts.append(f"{final_headers[col_idx]}:{content}") if row_parts: text_parts.append("、".join(row_parts)) return "\n".join(text_parts) except Exception as e: print(f"转换表格为文本时出错: {str(e)}") return "【表格处理失败】" def _convert_table_to_text(self, table: Table) -> str: """ 转换表格为文本格式(兼容方法) Args: table: Table对象 Returns: str: 表格的文本表示 """ return self.convert_to_text(table)