diff --git a/cxs/table_processor.py b/cxs/table_processor.py new file mode 100644 index 0000000..8b52c98 --- /dev/null +++ b/cxs/table_processor.py @@ -0,0 +1,705 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import List, Dict, Any, Optional, Tuple, Union +from dataclasses import dataclass, field +from datetime import datetime +import re +import json +from copy import deepcopy + +@dataclass +class Cell: + """单元格数据结构""" + text: str = "" # 单元格文本内容 + row_span: int = 1 # 垂直合并行数 + col_span: int = 1 # 水平合并列数 + is_header: bool = False # 是否是表头单元格 + data_type: str = "text" # 数据类型:text, number, date, currency等 + original_value: Any = None # 原始值 + formatted_value: str = "" # 格式化后的值 + position: Dict[str, int] = field(default_factory=lambda: {"row": 0, "col": 0}) # 单元格位置 + metadata: Dict[str, Any] = field(default_factory=dict) # 元数据 + +@dataclass +class Row: + """行数据结构""" + cells: List[Cell] = field(default_factory=list) # 单元格列表 + is_header: bool = False # 是否是表头行 + row_index: int = 0 # 行索引 + metadata: Dict[str, Any] = field(default_factory=dict) # 元数据 + +@dataclass +class Table: + """表格数据结构""" + rows: List[Row] = field(default_factory=list) # 行列表 + header_rows: int = 0 # 表头行数 + total_rows: int = 0 # 总行数 + total_cols: int = 0 # 总列数 + has_complex_header: bool = False # 是否有复杂表头 + table_type: str = "normal" # 表格类型:normal, key_value, matrix等 + metadata: Dict[str, Any] = field(default_factory=dict) # 元数据 + +@dataclass +class TableData: + """表格数据结构""" + rows: List[List[Dict[str, Any]]] = field(default_factory=list) # 存储表格行数据 + style: Optional[str] = None # 表格样式 + columns: List[Dict[str, Any]] = field(default_factory=list) # 列属性 + has_multi_level_header: bool = False # 是否有多级表头 + has_key_value_pairs: bool = False # 是否包含键值对结构 + header_rows: int = 1 # 表头行数,默认为1 + table_type: str = "normal" # 表格类型:normal, key_value, matrix等 + + def add_row(self, row_data: List[Dict[str, Any]]): + """添加一行数据到表格""" + self.rows.append(row_data) + + def get_row_count(self) -> int: + """获取表格行数""" + return len(self.rows) + + def get_column_count(self) -> int: + """获取表格列数""" + return len(self.columns) if self.columns else ( + max((len(row) for row in self.rows), default=0) + ) + + def is_empty(self) -> bool: + """检查表格是否为空""" + return len(self.rows) == 0 + + def get_cell_text(self, row_idx: int, col_idx: int) -> str: + """获取单元格文本内容""" + try: + if 0 <= row_idx < len(self.rows) and 0 <= col_idx < len(self.rows[row_idx]): + cell = self.rows[row_idx][col_idx] + return cell.get('text', '').strip() + except Exception as e: + print(f"获取单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}") + return '' + + def set_cell_text(self, row_idx: int, col_idx: int, text: str): + """设置单元格文本内容""" + try: + if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): + self.rows[row_idx][col_idx]['text'] = text + except Exception as e: + print(f"设置单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}") + + def get_cell_merge_info(self, row_idx: int, col_idx: int) -> Dict[str, Any]: + """获取单元格合并信息""" + try: + if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): + cell = self.rows[row_idx][col_idx] + return { + 'gridspan': cell.get('gridspan', 1), + 'vmerge': cell.get('vmerge', None) + } + except Exception as e: + print(f"获取单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}") + return {'gridspan': 1, 'vmerge': None} + + def set_cell_merge_info(self, row_idx: int, col_idx: int, gridspan: int = 1, vmerge: Optional[str] = None): + """设置单元格合并信息""" + try: + if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): + cell = self.rows[row_idx][col_idx] + cell['gridspan'] = gridspan + if vmerge is not None: + cell['vmerge'] = vmerge + except Exception as e: + print(f"设置单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}") + + def is_merged_cell(self, row_idx: int, col_idx: int) -> bool: + """检查单元格是否是合并单元格""" + try: + if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]): + cell = self.rows[row_idx][col_idx] + return cell.get('gridspan', 1) > 1 or cell.get('vmerge') is not None + except Exception as e: + print(f"检查单元格合并状态时出错 [{row_idx},{col_idx}]: {str(e)}") + return False + + def get_header_rows(self) -> List[List[Dict[str, Any]]]: + """获取表头行数据""" + return self.rows[:self.header_rows] + + def get_data_rows(self) -> List[List[Dict[str, Any]]]: + """获取数据行数据""" + return self.rows[self.header_rows:] + +class TableProcessor: + """增强的表格处理器""" + + def __init__(self): + # 数据类型识别模式 + self.patterns = { + 'currency': r'^\s*¥?\s*\d+(\.\d{2})?\s*$', # 货币金额 + 'percentage': r'^\s*\d+(\.\d+)?%\s*$', # 百分比 + 'date': r'^\d{4}[-/年]\d{1,2}[-/月]\d{1,2}日?$', # 日期 + 'number': r'^\s*\d+(\.\d+)?\s*$', # 数字 + 'time': r'^\d{1,2}:\d{2}(:\d{2})?$' # 时间 + } + + # 表头关键词 + self.header_keywords = [ + '序号', '编号', '项目', '名称', '类型', '说明', '备注', + '金额', '时间', '日期', '地区', '部门', '人员' + ] + + def process_table(self, raw_table: Any) -> Table: + """处理表格,返回标准化的表格对象""" + try: + # 1. 初始化表格对象 + table = Table() + + # 2. 分析表格结构 + self._analyze_table_structure(raw_table, table) + + # 3. 处理表头 + self._process_headers(raw_table, table) + + # 4. 处理数据行 + self._process_data_rows(raw_table, table) + + # 5. 规范化表格 + self._normalize_table(table) + + # 6. 识别表格类型 + self._identify_table_type(table) + + return table + + except Exception as e: + print(f"处理表格时出错: {str(e)}") + return Table() + + def _analyze_table_structure(self, raw_table: Any, table: Table): + """分析表格结构,包括行数、列数、合并单元格等""" + try: + # 获取基本维度信息 + rows = raw_table.rows + table.total_rows = len(rows) + table.total_cols = len(raw_table.columns) + + # 分析表头结构 + header_info = self._analyze_header_structure(raw_table) + table.header_rows = header_info['header_rows'] + table.has_complex_header = header_info['is_complex'] + + # 记录结构信息到元数据 + table.metadata['structure_info'] = { + 'total_rows': table.total_rows, + 'total_cols': table.total_cols, + 'header_rows': table.header_rows, + 'has_complex_header': table.has_complex_header, + 'analyzed_at': datetime.now().isoformat() + } + + except Exception as e: + print(f"分析表格结构时出错: {str(e)}") + + def _analyze_header_structure(self, raw_table: Any) -> Dict[str, Any]: + """分析表头结构,返回表头信息""" + header_info = { + 'header_rows': 1, + 'is_complex': False + } + + try: + # 检查前三行 + for i in range(min(3, len(raw_table.rows))): + row = raw_table.rows[i] + + # 检查是否有合并单元格 + has_merged_cells = any( + cell._element.find('.//{*}vMerge') is not None or + cell._element.find('.//{*}gridSpan') is not None + for cell in row.cells + ) + + # 检查是否包含表头关键词 + has_header_keywords = any( + any(keyword in cell.text for keyword in self.header_keywords) + for cell in row.cells + ) + + if has_merged_cells or has_header_keywords: + header_info['header_rows'] = max(header_info['header_rows'], i + 1) + if has_merged_cells: + header_info['is_complex'] = True + + # 检查单元格格式是否符合表头特征 + cell_formats = [self._analyze_cell_format(cell) for cell in row.cells] + if any(fmt == 'header' for fmt in cell_formats): + header_info['header_rows'] = max(header_info['header_rows'], i + 1) + + except Exception as e: + print(f"分析表头结构时出错: {str(e)}") + + return header_info + + def _analyze_cell_format(self, cell: Any) -> str: + """分析单元格格式特征""" + try: + # 获取单元格文本 + text = cell.text.strip() + + # 检查是否是表头格式 + if text and any(char.isupper() for char in text): # 包含大写字母 + return 'header' + if text and any(keyword in text for keyword in self.header_keywords): + return 'header' + + # 检查数据类型 + for data_type, pattern in self.patterns.items(): + if re.match(pattern, text): + return data_type + + return 'text' + + except Exception as e: + print(f"分析单元格格式时出错: {str(e)}") + return 'text' + + def _process_headers(self, raw_table: Any, table: Table): + """处理表头,包括多级表头的处理""" + try: + for i in range(min(table.header_rows, len(raw_table.rows))): + try: + row = raw_table.rows[i] + header_row = Row(is_header=True, row_index=i) + + # 处理每个表头单元格 + col_index = 0 + max_cols = len(row.cells) # 获取实际的列数 + + for cell_idx in range(max_cols): + try: + cell = row.cells[cell_idx] + header_cell = self._process_header_cell(cell, i, col_index) + header_row.cells.append(header_cell) + col_index += header_cell.col_span + except Exception as cell_error: + print(f"处理表头单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}") + # 添加一个空单元格 + header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index}) + header_row.cells.append(header_cell) + col_index += 1 + + # 如果单元格数量不足,补充空单元格 + while len(header_row.cells) < table.total_cols: + header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index}) + header_row.cells.append(header_cell) + col_index += 1 + + table.rows.append(header_row) + + except Exception as row_error: + print(f"处理表头行时出错 [行={i}]: {str(row_error)}") + # 创建一个空行 + empty_row = Row(is_header=True, row_index=i) + for col in range(table.total_cols): + empty_row.cells.append(Cell(text="", is_header=True, position={'row': i, 'col': col})) + table.rows.append(empty_row) + + except Exception as e: + print(f"处理表头时出错: {str(e)}") + + def _process_header_cell(self, cell: Any, row_index: int, col_index: int) -> Cell: + """处理表头单元格""" + try: + # 创建表头单元格 + header_cell = Cell( + text=cell.text.strip(), + is_header=True, + position={'row': row_index, 'col': col_index} + ) + + # 处理合并单元格 + vmerge = cell._element.find('.//{*}vMerge') + gridspan = cell._element.find('.//{*}gridSpan') + + if vmerge is not None: + val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue') + header_cell.row_span = 2 if val == 'restart' else 1 + + if gridspan is not None: + try: + header_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1')) + except ValueError: + header_cell.col_span = 1 + + return header_cell + + except Exception as e: + print(f"处理表头单元格时出错: {str(e)}") + return Cell(text="", is_header=True, position={'row': row_index, 'col': col_index}) + + def _process_data_rows(self, raw_table: Any, table: Table): + """处理数据行""" + try: + for i in range(table.header_rows, table.total_rows): + try: + row = raw_table.rows[i] + data_row = Row(is_header=False, row_index=i) + + # 处理每个数据单元格 + col_index = 0 + max_cols = len(row.cells) # 获取实际的列数 + + for cell_idx in range(max_cols): + try: + cell = row.cells[cell_idx] + data_cell = self._process_data_cell(cell, i, col_index) + data_row.cells.append(data_cell) + col_index += data_cell.col_span + except Exception as cell_error: + print(f"处理单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}") + # 添加一个空单元格 + data_cell = Cell(text="", position={'row': i, 'col': col_index}) + data_row.cells.append(data_cell) + col_index += 1 + + # 如果单元格数量不足,补充空单元格 + while len(data_row.cells) < table.total_cols: + data_cell = Cell(text="", position={'row': i, 'col': col_index}) + data_row.cells.append(data_cell) + col_index += 1 + + table.rows.append(data_row) + + except Exception as row_error: + print(f"处理数据行时出错 [行={i}]: {str(row_error)}") + # 创建一个空行 + empty_row = Row(is_header=False, row_index=i) + for col in range(table.total_cols): + empty_row.cells.append(Cell(text="", position={'row': i, 'col': col})) + table.rows.append(empty_row) + + except Exception as e: + print(f"处理数据行时出错: {str(e)}") + + def _process_data_cell(self, cell: Any, row_index: int, col_index: int) -> Cell: + """处理数据单元格""" + try: + # 获取单元格文本 + text = cell.text.strip() + + # 创建数据单元格 + data_cell = Cell( + text=text, + position={'row': row_index, 'col': col_index} + ) + + # 识别数据类型 + data_type = 'text' + original_value = text + formatted_value = text + + # 尝试识别数据类型和格式化值 + for type_name, pattern in self.patterns.items(): + if re.match(pattern, text): + data_type = type_name + if type_name == 'currency': + # 处理货币金额 + try: + value = float(re.sub(r'[¥,\s]', '', text)) + original_value = value + formatted_value = f"¥{value:.2f}" + except ValueError: + pass + elif type_name == 'percentage': + # 处理百分比 + try: + value = float(text.rstrip('%')) / 100 + original_value = value + formatted_value = f"{value:.2%}" + except ValueError: + pass + elif type_name == 'date': + # 处理日期 + try: + # 统一日期格式 + date_text = re.sub(r'[年月日]', '-', text).rstrip('-') + date_obj = datetime.strptime(date_text, '%Y-%m-%d') + original_value = date_obj + formatted_value = date_obj.strftime('%Y-%m-%d') + except ValueError: + pass + break + + data_cell.data_type = data_type + data_cell.original_value = original_value + data_cell.formatted_value = formatted_value + + # 处理合并单元格 + vmerge = cell._element.find('.//{*}vMerge') + gridspan = cell._element.find('.//{*}gridSpan') + + if vmerge is not None: + val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue') + data_cell.row_span = 2 if val == 'restart' else 1 + + if gridspan is not None: + try: + data_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1')) + except ValueError: + data_cell.col_span = 1 + + return data_cell + + except Exception as e: + print(f"处理数据单元格时出错: {str(e)}") + return Cell(text="", position={'row': row_index, 'col': col_index}) + + def _normalize_table(self, table: Table): + """规范化表格,确保所有行都有相同的列数""" + try: + max_cols = table.total_cols + + # 确保每行都有正确的列数 + for row in table.rows: + current_cols = len(row.cells) + if current_cols < max_cols: + # 添加空单元格 + for col in range(current_cols, max_cols): + empty_cell = Cell( + text="", + is_header=row.is_header, + position={'row': row.row_index, 'col': col} + ) + row.cells.append(empty_cell) + elif current_cols > max_cols: + # 移除多余的单元格 + row.cells = row.cells[:max_cols] + + # 更新表格的总列数 + table.total_cols = max_cols + + except Exception as e: + print(f"规范化表格时出错: {str(e)}") + + def _identify_table_type(self, table: Table): + """识别表格类型""" + try: + # 检查是否是键值对表格 + if table.total_cols == 2: + key_col_pattern = all( + cell.text.strip() != "" for row in table.rows + if not row.is_header for cell in row.cells[:1] + ) + if key_col_pattern: + table.table_type = "key_value" + return + + # 检查是否是矩阵表格 + if table.has_complex_header and table.total_cols > 2: + table.table_type = "matrix" + return + + # 默认为普通表格 + table.table_type = "normal" + + except Exception as e: + print(f"识别表格类型时出错: {str(e)}") + table.table_type = "normal" + + def convert_to_markdown(self, table: Table) -> str: + """将表格转换为Markdown格式""" + try: + markdown_lines = [] + + # 处理表头 + for i in range(table.header_rows): + row = table.rows[i] + header_cells = [cell.text for cell in row.cells] + markdown_lines.append("| " + " | ".join(header_cells) + " |") + + # 添加分隔行 + if i == table.header_rows - 1: + separator = "|" + "|".join(["---" for _ in range(table.total_cols)]) + "|" + markdown_lines.append(separator) + + # 处理数据行 + for row in table.rows[table.header_rows:]: + data_cells = [ + cell.formatted_value if cell.formatted_value + else cell.text for cell in row.cells + ] + markdown_lines.append("| " + " | ".join(data_cells) + " |") + + return "\n".join(markdown_lines) + + except Exception as e: + print(f"转换为Markdown格式时出错: {str(e)}") + return "" + + def convert_to_html(self, table: Table) -> str: + """将表格转换为HTML格式""" + try: + html_lines = ['
{cell.text} | ") + html_lines.append("
---|
{display_value} | ") + html_lines.append("