#!/usr/bin/env python3 # -*- coding: utf-8 -*- import re from typing import List, Dict, Any, Optional, Union import os from docx.oxml import parse_xml from docx.oxml.ns import nsdecls from docx.table import Table, _Cell from docx.text.paragraph import Paragraph from docx.shared import Pt from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from docx.enum.table import WD_TABLE_ALIGNMENT from copy import deepcopy from collections import Counter from datetime import datetime # 自定义TableData类,用于存储表格数据 class TableData: def __init__(self): """ 初始化表格数据结构 """ self.rows = [] # 存储表格行数据 self.style = None # 表格样式 self.columns = [] # 列属性 self.has_multi_level_header = False # 是否有多级表头 self.has_key_value_pairs = False # 是否包含键值对结构 self.header_rows = 1 # 表头行数,默认为1 def cell(self, row_idx: int, col_idx: int) -> Dict[str, Any]: """ 获取表格单元格 Args: row_idx: 行索引 col_idx: 列索引 Returns: Dict: 单元格数据 """ try: # 首先检查行索引是否有效 if row_idx < 0 or row_idx >= len(self.rows): return {'text': '', 'gridspan': 1, 'vmerge': None} # 然后检查列索引是否有效 if col_idx < 0 or col_idx >= len(self.rows[row_idx]): return {'text': '', 'gridspan': 1, 'vmerge': None} # 如果需要,进行额外的安全检查 cell = self.rows[row_idx][col_idx] if not isinstance(cell, dict): print(f"警告:单元格数据格式错误 [{row_idx},{col_idx}]") return {'text': str(cell) if cell is not None else '', 'gridspan': 1, 'vmerge': None} return cell except Exception as e: print(f"获取单元格时出错 [{row_idx},{col_idx}]: {str(e)}") return {'text': '', 'gridspan': 1, 'vmerge': None} def add_row(self, row_data: List[Dict[str, Any]]): """ 添加一行数据到表格 Args: row_data: 行数据列表 """ self.rows.append(row_data) def get_row_count(self) -> int: """ 获取表格行数 Returns: int: 表格行数 """ return len(self.rows) def get_column_count(self) -> int: """ 获取表格列数 Returns: int: 表格列数 """ return len(self.columns) if self.columns else 0 def is_empty(self) -> bool: """ 检查表格是否为空 Returns: bool: 表格是否为空 """ return len(self.rows) == 0 or len(self.columns) == 0 def get_cell_text(self, row_idx: int, col_idx: int) -> str: """ 获取单元格文本内容 Args: row_idx: 行索引 col_idx: 列索引 Returns: str: 单元格文本内容 """ cell = self.cell(row_idx, col_idx) return cell.get('text', '').strip() def set_cell_text(self, row_idx: int, col_idx: int, text: str): """ 设置单元格文本内容 Args: row_idx: 行索引 col_idx: 列索引 text: 文本内容 """ try: if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): self.rows[row_idx][col_idx]['text'] = text except Exception as e: print(f"设置单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}") def get_cell_merge_info(self, row_idx: int, col_idx: int) -> Dict[str, Any]: """ 获取单元格合并信息 Args: row_idx: 行索引 col_idx: 列索引 Returns: Dict: 合并信息 """ cell = self.cell(row_idx, col_idx) return { 'gridspan': cell.get('gridspan', 1), 'vmerge': cell.get('vmerge', None) } def set_cell_merge_info(self, row_idx: int, col_idx: int, gridspan: int = 1, vmerge: Optional[str] = None): """ 设置单元格合并信息 Args: row_idx: 行索引 col_idx: 列索引 gridspan: 水平合并列数 vmerge: 垂直合并状态 """ try: if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): cell = self.rows[row_idx][col_idx] cell['gridspan'] = gridspan if vmerge is not None: cell['vmerge'] = vmerge except Exception as e: print(f"设置单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}") def is_merged_cell(self, row_idx: int, col_idx: int) -> bool: """ 检查单元格是否是合并单元格 Args: row_idx: 行索引 col_idx: 列索引 Returns: bool: 是否是合并单元格 """ cell = self.cell(row_idx, col_idx) return cell.get('gridspan', 1) > 1 or cell.get('vmerge') is not None def get_header_rows(self) -> List[List[Dict[str, Any]]]: """ 获取表头行数据 Returns: List[List[Dict[str, Any]]]: 表头行数据 """ return self.rows[:self.header_rows] def get_data_rows(self) -> List[List[Dict[str, Any]]]: """ 获取数据行数据 Returns: List[List[Dict[str, Any]]]: 数据行数据 """ return self.rows[self.header_rows:] def to_dict(self) -> Dict[str, Any]: """ 将表格数据转换为字典格式 Returns: Dict[str, Any]: 表格数据字典 """ return { 'rows': self.rows, 'style': self.style, 'columns': self.columns, 'has_multi_level_header': self.has_multi_level_header, 'has_key_value_pairs': self.has_key_value_pairs, 'header_rows': self.header_rows } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'TableData': """ 从字典创建表格数据对象 Args: data: 表格数据字典 Returns: TableData: 表格数据对象 """ table = cls() table.rows = data.get('rows', []) table.style = data.get('style') table.columns = data.get('columns', []) table.has_multi_level_header = data.get('has_multi_level_header', False) table.has_key_value_pairs = data.get('has_key_value_pairs', False) table.header_rows = data.get('header_rows', 1) return table class TableProcessor: def __init__(self): """ 初始化表格处理器 """ pass def _is_valid_table(self, table) -> bool: """ 检查表格是否有效 Args: table: 表格对象 Returns: bool: 表格是否有效 """ try: # 检查表格是否有基本属性 if not hasattr(table, '_element'): return False # 检查表格是否有行和列 if not hasattr(table, 'rows') or not hasattr(table, 'columns'): return False # 检查表格是否为空 if len(table.rows) == 0: return False # 检查每行是否有单元格 for row in table.rows: if not hasattr(row, '_tr'): return False if len(row._tr.tc_lst) == 0: return False return True except Exception as e: print(f"验证表格时出错: {str(e)}") return False def _get_vmerge_value(self, cell_element) -> str: """ 获取单元格的垂直合并属性 Args: cell_element: 单元格元素 Returns: str: 垂直合并属性值 """ vmerge = cell_element.find('.//{%s}vMerge' % 'http://schemas.openxmlformats.org/wordprocessingml/2006/main') if vmerge is not None: return vmerge.get(self.qn('w:val'), 'continue') return None def _get_gridspan_value(self, cell_element) -> int: """ 获取单元格的水平合并数量 Args: cell_element: 单元格元素 Returns: int: 水平合并的列数 """ try: gridspan = cell_element.find('.//{%s}gridSpan' % 'http://schemas.openxmlformats.org/wordprocessingml/2006/main') if gridspan is not None and gridspan.get(self.qn('w:val')): return int(gridspan.get(self.qn('w:val'))) except (ValueError, TypeError, AttributeError) as e: print(f"警告:获取gridspan值时出错: {str(e)}") return 1 # 默认返回1,表示没有合并 def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int: """ 计算垂直合并的行数 Args: table: 表格对象 start_row: 起始行 col: 列号 Returns: int: 垂直合并的行数 """ span = 1 for i in range(start_row + 1, len(table.rows)): cell = table.cell(i, col) if self._get_vmerge_value(cell._element) == 'continue': span += 1 else: break return span def _convert_table_to_text(self, table: Table) -> str: """ 将表格转换为文本格式 Args: table: docx表格对象 Returns: str: 表格的文本表示 """ try: # 验证表格对象 if not hasattr(table, '_element') or not hasattr(table, 'rows') or not hasattr(table, 'columns'): return "【无效表格】" # 获取表格的行数和列数 rows = len(table.rows) cols = len(table.columns) if rows == 0 or cols == 0: return "【空表格】" # 存储处理后的表格数据 text_parts = [] # 遍历所有行和单元格 for row in table.rows: row_texts = [] for cell in row.cells: # 获取单元格中的所有段落文本 cell_text = ' '.join(p.text.strip() for p in cell.paragraphs if p.text.strip()) if cell_text: row_texts.append(cell_text) # 如果行中有文本,添加到结果中 if row_texts: text_parts.append(' | '.join(row_texts)) # 用空格连接所有行 if text_parts: return " ".join(text_parts) else: return "【表格无有效数据】" except Exception as e: print(f"警告:处理表格时出错: {str(e)}") return "【表格处理失败】" def _extract_plain_text_from_table(self, table: Table) -> str: """ 从表格中提取纯文本内容 Args: table: 表格对象 Returns: str: 表格的文本表示 """ return self._convert_table_to_text(table) def _extract_table_row(self, row_element, namespace): """ 从行元素中提取单元格数据 Args: row_element: 行元素 namespace: XML命名空间 Returns: List[Dict]: 单元格数据列表 """ cells = [] # 使用find和findall替代xpath for cell_element in row_element.findall('.//{%s}tc' % namespace['w']): try: # 获取单元格文本 text_elements = cell_element.findall('.//{%s}t' % namespace['w']) text = ' '.join([p.text for p in text_elements if p.text]) # 获取gridspan值 gridspan = 1 gridspan_elem = cell_element.find('.//{%s}gridSpan' % namespace['w']) if gridspan_elem is not None: try: gridspan = int(gridspan_elem.get(self.qn('w:val'), 1)) except (ValueError, TypeError): gridspan = 1 # 获取vmerge值 vmerge = None vmerge_elem = cell_element.find('.//{%s}vMerge' % namespace['w']) if vmerge_elem is not None: vmerge = vmerge_elem.get(self.qn('w:val'), 'continue') # 创建单元格数据 cell_data = { 'text': text.strip(), 'gridspan': gridspan, 'vmerge': vmerge } cells.append(cell_data) except Exception as e: print(f"处理单元格时出错: {str(e)}") cells.append({'text': '', 'gridspan': 1, 'vmerge': None}) return cells def _preprocess_table(self, element, namespace): """ 预处理表格元素,提取表格数据 Args: element: 表格元素 namespace: XML命名空间 Returns: TableData: 处理后的表格数据 """ try: # 创建新的TableData对象 table_data = TableData() # 获取表格样式 style_elem = element.find('.//{%s}tblStyle' % namespace['w']) if style_elem is not None: table_data.style = style_elem.get(self.qn('w:val')) # 获取表格网格信息 grid_cols = element.findall('.//{%s}gridCol' % namespace['w']) table_data.columns = [{'width': col.get(self.qn('w:w'))} for col in grid_cols] # 处理表格行 rows = element.findall('.//{%s}tr' % namespace['w']) header_row_count = 0 has_multi_level_header = False # 分析表头结构 for i, row in enumerate(rows[:3]): # 只检查前3行 cells = self._extract_table_row(row, namespace) if any(cell.get('vmerge') == 'restart' for cell in cells): has_multi_level_header = True header_row_count = max(header_row_count, i + 2) elif any(cell.get('gridspan', 1) > 1 for cell in cells): has_multi_level_header = True header_row_count = max(header_row_count, i + 1) # 如果没有检测到多级表头,默认第一行为表头 if not has_multi_level_header: header_row_count = 1 table_data.has_multi_level_header = has_multi_level_header table_data.header_rows = header_row_count # 处理所有行 for row in rows: cells = self._extract_table_row(row, namespace) table_data.add_row(cells) # 检查是否是键值对表格 if len(table_data.rows) > 0: first_row = table_data.rows[0] if len(first_row) == 2: # 如果只有两列 # 检查第一列是否都是标签/键 is_key_value = True for row in table_data.rows: if len(row) != 2 or not row[0]['text'].strip(): is_key_value = False break table_data.has_key_value_pairs = is_key_value return table_data except Exception as e: print(f"预处理表格时出错: {str(e)}") return TableData() def qn(self, tag: str) -> str: """ 将标签转换为带命名空间的格式 Args: tag: 原始标签 Returns: str: 带命名空间的标签 """ prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" return prefix + tag def _clean_table_data(self, table: Dict[str, Any]) -> Dict[str, Any]: """ 清理和标准化表格数据。 Args: table: 原始表格数据字典 Returns: Dict[str, Any]: 清理后的表格数据字典 """ if not isinstance(table, dict): raise ValueError("表格数据必须是字典类型") # 创建新的表格对象 cleaned_table = { 'rows': [], 'metadata': table.get('metadata', {}) } rows = table.get('rows', []) if not rows: return cleaned_table try: # 获取最大列数 max_cols = max(len(row.get('cells', [])) for row in rows) # 处理每一行 for row_idx, row in enumerate(rows): cleaned_row = { 'cells': [], 'metadata': row.get('metadata', {}) } cells = row.get('cells', []) # 确保每行都有相同数量的单元格 for col_idx in range(max_cols): if col_idx < len(cells): cell = cells[col_idx] cleaned_cell = self._clean_cell_data(cell) else: # 添加空单元格 cleaned_cell = { 'text': '', 'gridspan': 1, 'vmerge': 0 } # 添加位置信息 cleaned_cell['position'] = { 'row': row_idx, 'col': col_idx } cleaned_row['cells'].append(cleaned_cell) cleaned_table['rows'].append(cleaned_row) # 添加清理信息到元数据 cleaned_table['metadata']['cleaning_info'] = { 'cleaned_at': datetime.now().isoformat(), 'original_rows': len(rows), 'cleaned_rows': len(cleaned_table['rows']), 'standardized_columns': max_cols } return cleaned_table except Exception as e: raise ValueError(f"清理表格数据时发生错误: {str(e)}") def _clean_cell_data(self, cell: Dict[str, Any]) -> Dict[str, Any]: """ 清理单元格数据。 Args: cell: 单元格数据 Returns: Dict[str, Any]: 清理后的单元格数据 """ # 创建基本单元格结构 cleaned_cell = { 'text': '', 'gridspan': 1, 'vmerge': False } # 清理文本内容 if 'text' in cell: text = str(cell['text']) # 删除零宽字符 text = ''.join(char for char in text if char.isprintable() or char in ('\n', '\t')) # 规范化空白字符 text = ' '.join(text.split()) cleaned_cell['text'] = text # 处理gridspan try: gridspan = int(cell.get('gridspan', 1)) cleaned_cell['gridspan'] = max(1, gridspan) except (ValueError, TypeError): cleaned_cell['gridspan'] = 1 # 处理vmerge vmerge = cell.get('vmerge') if isinstance(vmerge, bool): cleaned_cell['vmerge'] = vmerge elif vmerge in ('restart', 'continue'): cleaned_cell['vmerge'] = vmerge else: cleaned_cell['vmerge'] = False return cleaned_cell