doc-etl/cxs/table_processor.py
2025-05-20 13:47:56 +08:00

705 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
import re
import json
from copy import deepcopy
@dataclass
class Cell:
"""单元格数据结构"""
text: str = "" # 单元格文本内容
row_span: int = 1 # 垂直合并行数
col_span: int = 1 # 水平合并列数
is_header: bool = False # 是否是表头单元格
data_type: str = "text" # 数据类型text, number, date, currency等
original_value: Any = None # 原始值
formatted_value: str = "" # 格式化后的值
position: Dict[str, int] = field(default_factory=lambda: {"row": 0, "col": 0}) # 单元格位置
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
@dataclass
class Row:
"""行数据结构"""
cells: List[Cell] = field(default_factory=list) # 单元格列表
is_header: bool = False # 是否是表头行
row_index: int = 0 # 行索引
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
@dataclass
class Table:
"""表格数据结构"""
rows: List[Row] = field(default_factory=list) # 行列表
header_rows: int = 0 # 表头行数
total_rows: int = 0 # 总行数
total_cols: int = 0 # 总列数
has_complex_header: bool = False # 是否有复杂表头
table_type: str = "normal" # 表格类型normal, key_value, matrix等
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
@dataclass
class TableData:
"""表格数据结构"""
rows: List[List[Dict[str, Any]]] = field(default_factory=list) # 存储表格行数据
style: Optional[str] = None # 表格样式
columns: List[Dict[str, Any]] = field(default_factory=list) # 列属性
has_multi_level_header: bool = False # 是否有多级表头
has_key_value_pairs: bool = False # 是否包含键值对结构
header_rows: int = 1 # 表头行数默认为1
table_type: str = "normal" # 表格类型normal, key_value, matrix等
def add_row(self, row_data: List[Dict[str, Any]]):
"""添加一行数据到表格"""
self.rows.append(row_data)
def get_row_count(self) -> int:
"""获取表格行数"""
return len(self.rows)
def get_column_count(self) -> int:
"""获取表格列数"""
return len(self.columns) if self.columns else (
max((len(row) for row in self.rows), default=0)
)
def is_empty(self) -> bool:
"""检查表格是否为空"""
return len(self.rows) == 0
def get_cell_text(self, row_idx: int, col_idx: int) -> str:
"""获取单元格文本内容"""
try:
if 0 <= row_idx < len(self.rows) and 0 <= col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
return cell.get('text', '').strip()
except Exception as e:
print(f"获取单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}")
return ''
def set_cell_text(self, row_idx: int, col_idx: int, text: str):
"""设置单元格文本内容"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
self.rows[row_idx][col_idx]['text'] = text
except Exception as e:
print(f"设置单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}")
def get_cell_merge_info(self, row_idx: int, col_idx: int) -> Dict[str, Any]:
"""获取单元格合并信息"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
return {
'gridspan': cell.get('gridspan', 1),
'vmerge': cell.get('vmerge', None)
}
except Exception as e:
print(f"获取单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}")
return {'gridspan': 1, 'vmerge': None}
def set_cell_merge_info(self, row_idx: int, col_idx: int, gridspan: int = 1, vmerge: Optional[str] = None):
"""设置单元格合并信息"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
cell['gridspan'] = gridspan
if vmerge is not None:
cell['vmerge'] = vmerge
except Exception as e:
print(f"设置单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}")
def is_merged_cell(self, row_idx: int, col_idx: int) -> bool:
"""检查单元格是否是合并单元格"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
return cell.get('gridspan', 1) > 1 or cell.get('vmerge') is not None
except Exception as e:
print(f"检查单元格合并状态时出错 [{row_idx},{col_idx}]: {str(e)}")
return False
def get_header_rows(self) -> List[List[Dict[str, Any]]]:
"""获取表头行数据"""
return self.rows[:self.header_rows]
def get_data_rows(self) -> List[List[Dict[str, Any]]]:
"""获取数据行数据"""
return self.rows[self.header_rows:]
class TableProcessor:
"""增强的表格处理器"""
def __init__(self):
# 数据类型识别模式
self.patterns = {
'currency': r'^\s*¥?\s*\d+(\.\d{2})?\s*$', # 货币金额
'percentage': r'^\s*\d+(\.\d+)?%\s*$', # 百分比
'date': r'^\d{4}[-/年]\d{1,2}[-/月]\d{1,2}日?$', # 日期
'number': r'^\s*\d+(\.\d+)?\s*$', # 数字
'time': r'^\d{1,2}:\d{2}(:\d{2})?$' # 时间
}
# 表头关键词
self.header_keywords = [
'序号', '编号', '项目', '名称', '类型', '说明', '备注',
'金额', '时间', '日期', '地区', '部门', '人员'
]
def process_table(self, raw_table: Any) -> Table:
"""处理表格,返回标准化的表格对象"""
try:
# 1. 初始化表格对象
table = Table()
# 2. 分析表格结构
self._analyze_table_structure(raw_table, table)
# 3. 处理表头
self._process_headers(raw_table, table)
# 4. 处理数据行
self._process_data_rows(raw_table, table)
# 5. 规范化表格
self._normalize_table(table)
# 6. 识别表格类型
self._identify_table_type(table)
return table
except Exception as e:
print(f"处理表格时出错: {str(e)}")
return Table()
def _analyze_table_structure(self, raw_table: Any, table: Table):
"""分析表格结构,包括行数、列数、合并单元格等"""
try:
# 获取基本维度信息
rows = raw_table.rows
table.total_rows = len(rows)
table.total_cols = len(raw_table.columns)
# 分析表头结构
header_info = self._analyze_header_structure(raw_table)
table.header_rows = header_info['header_rows']
table.has_complex_header = header_info['is_complex']
# 记录结构信息到元数据
table.metadata['structure_info'] = {
'total_rows': table.total_rows,
'total_cols': table.total_cols,
'header_rows': table.header_rows,
'has_complex_header': table.has_complex_header,
'analyzed_at': datetime.now().isoformat()
}
except Exception as e:
print(f"分析表格结构时出错: {str(e)}")
def _analyze_header_structure(self, raw_table: Any) -> Dict[str, Any]:
"""分析表头结构,返回表头信息"""
header_info = {
'header_rows': 1,
'is_complex': False
}
try:
# 检查前三行
for i in range(min(3, len(raw_table.rows))):
row = raw_table.rows[i]
# 检查是否有合并单元格
has_merged_cells = any(
cell._element.find('.//{*}vMerge') is not None or
cell._element.find('.//{*}gridSpan') is not None
for cell in row.cells
)
# 检查是否包含表头关键词
has_header_keywords = any(
any(keyword in cell.text for keyword in self.header_keywords)
for cell in row.cells
)
if has_merged_cells or has_header_keywords:
header_info['header_rows'] = max(header_info['header_rows'], i + 1)
if has_merged_cells:
header_info['is_complex'] = True
# 检查单元格格式是否符合表头特征
cell_formats = [self._analyze_cell_format(cell) for cell in row.cells]
if any(fmt == 'header' for fmt in cell_formats):
header_info['header_rows'] = max(header_info['header_rows'], i + 1)
except Exception as e:
print(f"分析表头结构时出错: {str(e)}")
return header_info
def _analyze_cell_format(self, cell: Any) -> str:
"""分析单元格格式特征"""
try:
# 获取单元格文本
text = cell.text.strip()
# 检查是否是表头格式
if text and any(char.isupper() for char in text): # 包含大写字母
return 'header'
if text and any(keyword in text for keyword in self.header_keywords):
return 'header'
# 检查数据类型
for data_type, pattern in self.patterns.items():
if re.match(pattern, text):
return data_type
return 'text'
except Exception as e:
print(f"分析单元格格式时出错: {str(e)}")
return 'text'
def _process_headers(self, raw_table: Any, table: Table):
"""处理表头,包括多级表头的处理"""
try:
for i in range(min(table.header_rows, len(raw_table.rows))):
try:
row = raw_table.rows[i]
header_row = Row(is_header=True, row_index=i)
# 处理每个表头单元格
col_index = 0
max_cols = len(row.cells) # 获取实际的列数
for cell_idx in range(max_cols):
try:
cell = row.cells[cell_idx]
header_cell = self._process_header_cell(cell, i, col_index)
header_row.cells.append(header_cell)
col_index += header_cell.col_span
except Exception as cell_error:
print(f"处理表头单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}")
# 添加一个空单元格
header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index})
header_row.cells.append(header_cell)
col_index += 1
# 如果单元格数量不足,补充空单元格
while len(header_row.cells) < table.total_cols:
header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index})
header_row.cells.append(header_cell)
col_index += 1
table.rows.append(header_row)
except Exception as row_error:
print(f"处理表头行时出错 [行={i}]: {str(row_error)}")
# 创建一个空行
empty_row = Row(is_header=True, row_index=i)
for col in range(table.total_cols):
empty_row.cells.append(Cell(text="", is_header=True, position={'row': i, 'col': col}))
table.rows.append(empty_row)
except Exception as e:
print(f"处理表头时出错: {str(e)}")
def _process_header_cell(self, cell: Any, row_index: int, col_index: int) -> Cell:
"""处理表头单元格"""
try:
# 创建表头单元格
header_cell = Cell(
text=cell.text.strip(),
is_header=True,
position={'row': row_index, 'col': col_index}
)
# 处理合并单元格
vmerge = cell._element.find('.//{*}vMerge')
gridspan = cell._element.find('.//{*}gridSpan')
if vmerge is not None:
val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue')
header_cell.row_span = 2 if val == 'restart' else 1
if gridspan is not None:
try:
header_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1'))
except ValueError:
header_cell.col_span = 1
return header_cell
except Exception as e:
print(f"处理表头单元格时出错: {str(e)}")
return Cell(text="", is_header=True, position={'row': row_index, 'col': col_index})
def _process_data_rows(self, raw_table: Any, table: Table):
"""处理数据行"""
try:
for i in range(table.header_rows, table.total_rows):
try:
row = raw_table.rows[i]
data_row = Row(is_header=False, row_index=i)
# 处理每个数据单元格
col_index = 0
max_cols = len(row.cells) # 获取实际的列数
for cell_idx in range(max_cols):
try:
cell = row.cells[cell_idx]
data_cell = self._process_data_cell(cell, i, col_index)
data_row.cells.append(data_cell)
col_index += data_cell.col_span
except Exception as cell_error:
print(f"处理单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}")
# 添加一个空单元格
data_cell = Cell(text="", position={'row': i, 'col': col_index})
data_row.cells.append(data_cell)
col_index += 1
# 如果单元格数量不足,补充空单元格
while len(data_row.cells) < table.total_cols:
data_cell = Cell(text="", position={'row': i, 'col': col_index})
data_row.cells.append(data_cell)
col_index += 1
table.rows.append(data_row)
except Exception as row_error:
print(f"处理数据行时出错 [行={i}]: {str(row_error)}")
# 创建一个空行
empty_row = Row(is_header=False, row_index=i)
for col in range(table.total_cols):
empty_row.cells.append(Cell(text="", position={'row': i, 'col': col}))
table.rows.append(empty_row)
except Exception as e:
print(f"处理数据行时出错: {str(e)}")
def _process_data_cell(self, cell: Any, row_index: int, col_index: int) -> Cell:
"""处理数据单元格"""
try:
# 获取单元格文本
text = cell.text.strip()
# 创建数据单元格
data_cell = Cell(
text=text,
position={'row': row_index, 'col': col_index}
)
# 识别数据类型
data_type = 'text'
original_value = text
formatted_value = text
# 尝试识别数据类型和格式化值
for type_name, pattern in self.patterns.items():
if re.match(pattern, text):
data_type = type_name
if type_name == 'currency':
# 处理货币金额
try:
value = float(re.sub(r'[¥,\s]', '', text))
original_value = value
formatted_value = f"¥{value:.2f}"
except ValueError:
pass
elif type_name == 'percentage':
# 处理百分比
try:
value = float(text.rstrip('%')) / 100
original_value = value
formatted_value = f"{value:.2%}"
except ValueError:
pass
elif type_name == 'date':
# 处理日期
try:
# 统一日期格式
date_text = re.sub(r'[年月日]', '-', text).rstrip('-')
date_obj = datetime.strptime(date_text, '%Y-%m-%d')
original_value = date_obj
formatted_value = date_obj.strftime('%Y-%m-%d')
except ValueError:
pass
break
data_cell.data_type = data_type
data_cell.original_value = original_value
data_cell.formatted_value = formatted_value
# 处理合并单元格
vmerge = cell._element.find('.//{*}vMerge')
gridspan = cell._element.find('.//{*}gridSpan')
if vmerge is not None:
val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue')
data_cell.row_span = 2 if val == 'restart' else 1
if gridspan is not None:
try:
data_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1'))
except ValueError:
data_cell.col_span = 1
return data_cell
except Exception as e:
print(f"处理数据单元格时出错: {str(e)}")
return Cell(text="", position={'row': row_index, 'col': col_index})
def _normalize_table(self, table: Table):
"""规范化表格,确保所有行都有相同的列数"""
try:
max_cols = table.total_cols
# 确保每行都有正确的列数
for row in table.rows:
current_cols = len(row.cells)
if current_cols < max_cols:
# 添加空单元格
for col in range(current_cols, max_cols):
empty_cell = Cell(
text="",
is_header=row.is_header,
position={'row': row.row_index, 'col': col}
)
row.cells.append(empty_cell)
elif current_cols > max_cols:
# 移除多余的单元格
row.cells = row.cells[:max_cols]
# 更新表格的总列数
table.total_cols = max_cols
except Exception as e:
print(f"规范化表格时出错: {str(e)}")
def _identify_table_type(self, table: Table):
"""识别表格类型"""
try:
# 检查是否是键值对表格
if table.total_cols == 2:
key_col_pattern = all(
cell.text.strip() != "" for row in table.rows
if not row.is_header for cell in row.cells[:1]
)
if key_col_pattern:
table.table_type = "key_value"
return
# 检查是否是矩阵表格
if table.has_complex_header and table.total_cols > 2:
table.table_type = "matrix"
return
# 默认为普通表格
table.table_type = "normal"
except Exception as e:
print(f"识别表格类型时出错: {str(e)}")
table.table_type = "normal"
def convert_to_markdown(self, table: Table) -> str:
"""将表格转换为Markdown格式"""
try:
markdown_lines = []
# 处理表头
for i in range(table.header_rows):
row = table.rows[i]
header_cells = [cell.text for cell in row.cells]
markdown_lines.append("| " + " | ".join(header_cells) + " |")
# 添加分隔行
if i == table.header_rows - 1:
separator = "|" + "|".join(["---" for _ in range(table.total_cols)]) + "|"
markdown_lines.append(separator)
# 处理数据行
for row in table.rows[table.header_rows:]:
data_cells = [
cell.formatted_value if cell.formatted_value
else cell.text for cell in row.cells
]
markdown_lines.append("| " + " | ".join(data_cells) + " |")
return "\n".join(markdown_lines)
except Exception as e:
print(f"转换为Markdown格式时出错: {str(e)}")
return ""
def convert_to_html(self, table: Table) -> str:
"""将表格转换为HTML格式"""
try:
html_lines = ['<table border="1">']
# 处理表头
if table.header_rows > 0:
html_lines.append("<thead>")
for i in range(table.header_rows):
row = table.rows[i]
html_lines.append("<tr>")
for cell in row.cells:
span_attrs = []
if cell.row_span > 1:
span_attrs.append(f'rowspan="{cell.row_span}"')
if cell.col_span > 1:
span_attrs.append(f'colspan="{cell.col_span}"')
attrs = " ".join(span_attrs)
html_lines.append(f"<th {attrs}>{cell.text}</th>")
html_lines.append("</tr>")
html_lines.append("</thead>")
# 处理数据行
html_lines.append("<tbody>")
for row in table.rows[table.header_rows:]:
html_lines.append("<tr>")
for cell in row.cells:
span_attrs = []
if cell.row_span > 1:
span_attrs.append(f'rowspan="{cell.row_span}"')
if cell.col_span > 1:
span_attrs.append(f'colspan="{cell.col_span}"')
attrs = " ".join(span_attrs)
# 使用格式化值或原始文本
display_value = cell.formatted_value if cell.formatted_value else cell.text
html_lines.append(f"<td {attrs}>{display_value}</td>")
html_lines.append("</tr>")
html_lines.append("</tbody>")
html_lines.append("</table>")
return "\n".join(html_lines)
except Exception as e:
print(f"转换为HTML格式时出错: {str(e)}")
return ""
def convert_to_dict(self, table: Table) -> Dict[str, Any]:
"""将表格转换为字典格式"""
try:
result = {
'metadata': table.metadata,
'structure': {
'total_rows': table.total_rows,
'total_cols': table.total_cols,
'header_rows': table.header_rows,
'has_complex_header': table.has_complex_header,
'table_type': table.table_type
},
'headers': [],
'data': []
}
# 处理表头
for i in range(table.header_rows):
header_row = []
for cell in table.rows[i].cells:
header_row.append({
'text': cell.text,
'row_span': cell.row_span,
'col_span': cell.col_span,
'position': cell.position
})
result['headers'].append(header_row)
# 处理数据行
for row in table.rows[table.header_rows:]:
data_row = []
for cell in row.cells:
data_row.append({
'text': cell.text,
'data_type': cell.data_type,
'original_value': cell.original_value,
'formatted_value': cell.formatted_value,
'position': cell.position
})
result['data'].append(data_row)
return result
except Exception as e:
print(f"转换为字典格式时出错: {str(e)}")
return {}
def convert_to_text(self, table: Table) -> str:
"""
将表格转换为文本格式,以"标题:内容"的形式显示,多级表头用下划线连接
Args:
table: Table对象
Returns:
str: 表格的文本表示
"""
if not table or not table.rows:
return "【空表格】"
try:
# 存储处理后的文本行
text_parts = []
# 存储处理后的表头文本
header_texts = {}
# 处理表头
if table.header_rows > 0:
# 对于多级表头,需要合并处理
for row_idx in range(table.header_rows):
row = table.rows[row_idx]
for col_idx, cell in enumerate(row.cells):
# 获取当前列的已有表头文本
current_header = header_texts.get(col_idx, [])
if cell.text.strip():
current_header.append(cell.text.strip())
header_texts[col_idx] = current_header
# 合并多级表头
final_headers = {}
for col_idx, headers in header_texts.items():
final_headers[col_idx] = "_".join(headers) if headers else ""
# 处理数据行
data_rows = []
for row in table.rows[table.header_rows:]:
row_data = {}
for col_idx, cell in enumerate(row.cells):
if cell.text.strip():
row_data[col_idx] = cell.text.strip()
if row_data:
data_rows.append(row_data)
# 生成"标题:内容"格式输出
for row_idx, row_data in enumerate(data_rows):
row_parts = []
for col_idx, content in row_data.items():
if col_idx in final_headers and final_headers[col_idx]:
row_parts.append(f"{final_headers[col_idx]}:{content}")
if row_parts:
text_parts.append("".join(row_parts))
return "\n".join(text_parts)
except Exception as e:
print(f"转换表格为文本时出错: {str(e)}")
return "【表格处理失败】"
def _convert_table_to_text(self, table: Table) -> str:
"""
转换表格为文本格式(兼容方法)
Args:
table: Table对象
Returns:
str: 表格的文本表示
"""
return self.convert_to_text(table)