doc-etl/cxs/cxs_table_processor.py
2025-05-16 11:30:02 +08:00

961 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from typing import List, Dict, Any, Optional
import os
from docx.oxml import parse_xml
from docx.oxml.ns import nsdecls
# 自定义TableData类用于存储表格数据
class TableData:
def __init__(self):
"""
初始化表格数据结构
"""
self.rows = []
self.style = None
self.columns = [] # 添加列属性
def cell(self, row_idx: int, col_idx: int) -> Dict[str, Any]:
"""
获取表格单元格
Args:
row_idx: 行索引
col_idx: 列索引
Returns:
Dict: 单元格数据
"""
try:
# 首先检查行索引是否有效
if row_idx < 0 or row_idx >= len(self.rows):
return {'text': '', 'gridspan': 1, 'vmerge': None}
# 然后检查列索引是否有效
if col_idx < 0 or col_idx >= len(self.rows[row_idx]):
return {'text': '', 'gridspan': 1, 'vmerge': None}
# 如果需要,进行额外的安全检查
cell = self.rows[row_idx][col_idx]
if not isinstance(cell, dict):
print(f"警告:单元格数据格式错误 [{row_idx},{col_idx}]")
return {'text': str(cell) if cell is not None else '', 'gridspan': 1, 'vmerge': None}
return cell
except Exception as e:
print(f"获取单元格时出错 [{row_idx},{col_idx}]: {str(e)}")
return {'text': '', 'gridspan': 1, 'vmerge': None}
class TableProcessor:
def __init__(self):
"""
初始化表格处理器
"""
print("初始化表格处理器")
def _extract_table_row(self, row_element, namespace):
"""
提取表格行数据,增强的表格行处理
Args:
row_element: 行元素
namespace: XML命名空间
Returns:
List: 行数据列表
"""
row = []
try:
# 处理单元格
for cell_element in row_element.findall('.//w:tc', namespaces=namespace):
cell_text = ''
# 提取单元格中的所有文本
for paragraph in cell_element.findall('.//w:p', namespaces=namespace):
for run in paragraph.findall('.//w:t', namespaces=namespace):
if run.text:
cell_text += run.text
# 在段落后添加换行符
cell_text += '\n'
# 移除末尾换行
cell_text = cell_text.rstrip('\n')
# 检查单元格合并属性
gridspan = self._get_gridspan_value(cell_element)
vmerge = self._get_vmerge_value(cell_element)
# 创建单元格数据
cell = {
'text': cell_text,
'gridspan': gridspan,
'vmerge': vmerge
}
row.append(cell)
# 如果行为空,创建至少一个空单元格
if not row:
row.append({'text': '', 'gridspan': 1, 'vmerge': None})
return row
except Exception as e:
print(f"提取表格行数据时出错: {str(e)}")
# 返回至少有一个单元格的行
return [{'text': '', 'gridspan': 1, 'vmerge': None}]
def _preprocess_table(self, element, namespace):
"""
对表格进行预处理,加强特殊表格的识别能力
Args:
element: 表格元素
namespace: XML命名空间
Returns:
TableData: 预处理后的表格数据
"""
table = TableData()
# 检查并处理表格行
rows_elements = element.findall('.//w:tr', namespaces=namespace)
# 表格为空的特殊处理
if not rows_elements:
# 尝试寻找更深层次的表格元素,可能是嵌套在其他元素中的表格
nested_rows = element.findall('.//*//w:tr', namespaces=namespace)
if nested_rows:
rows_elements = nested_rows
print(f"已找到嵌套表格行:{len(rows_elements)}")
else:
# 创建一个默认行,避免表格为空
print("未找到表格行,创建默认行")
table.rows.append([{'text': '', 'gridspan': 1, 'vmerge': None}])
return table
# 处理每一行
for row_element in rows_elements:
row = self._extract_table_row(row_element, namespace)
table.rows.append(row)
# 如果表格为空,创建默认行
if not table.rows:
table.rows.append([{'text': '', 'gridspan': 1, 'vmerge': None}])
# 分析表格,确定列数
max_cols = 0
for row in table.rows:
# 计算考虑gridspan的实际列数
effective_cols = sum(cell.get('gridspan', 1) for cell in row)
max_cols = max(max_cols, effective_cols)
# 确保每行都有足够的列
for i, row in enumerate(table.rows):
current_cols = sum(cell.get('gridspan', 1) for cell in row)
if current_cols < max_cols:
# 添加空单元格来填充行
padding_cells = max_cols - current_cols
for _ in range(padding_cells):
row.append({'text': '', 'gridspan': 1, 'vmerge': None})
# 设置列索引
table.columns = [i for i in range(max_cols)]
# 增强对垂直合并单元格的处理
self._enhance_vertical_merges(table)
# 额外执行一次垂直合并内容传播,修复复杂表格中的合并单元格
self._propagate_vertical_merges(table)
return table
def _propagate_vertical_merges(self, table: TableData):
"""
专门处理复杂表格中的垂直合并单元格,向下传播内容
Args:
table: TableData对象
"""
rows = len(table.rows)
cols = len(table.columns) if table.columns else 0
if rows <= 1 or cols == 0:
return
# 创建一个矩阵记录每个单元格位置的内容
matrix = []
for i in range(rows):
row = []
for j in range(cols):
try:
if j < len(table.rows[i]):
cell = table.rows[i][j]
row.append(cell.get('text', '').strip())
else:
row.append('')
except (IndexError, KeyError):
row.append('') # 防止索引越界
matrix.append(row)
# 对每一列进行垂直合并检查
for j in range(cols):
# 从上到下传播非空内容
last_non_empty = None
last_non_empty_idx = -1
for i in range(rows):
try:
# 安全访问表格单元格
current_text = ''
if j < len(table.rows[i]):
cell = table.rows[i][j]
current_text = cell.get('text', '').strip()
# 如果当前单元格为空,但上方有非空单元格,考虑垂直合并
if not current_text and last_non_empty:
# 检查这是否可能是垂直合并
if i - last_non_empty_idx <= 3: # 限制垂直检查范围,避免过度填充
# 根据上下文判断是否真的是合并单元格
# 1. 检查该列其他单元格是否有相似模式
pattern_match = False
for k in range(rows):
if k != i and k != last_non_empty_idx:
# 查找相似模式:空单元格下方接非空单元格
if k > 0 and not matrix[k-1][j] and matrix[k][j]:
pattern_match = True
break
# 2. 检查第一列特殊情况 - 可能是分类表
is_first_columns = j < 2 # 前两列更可能是分类信息
if pattern_match or is_first_columns:
if j < len(table.rows[i]):
# 安全地更新当前单元格
table.rows[i][j]['text'] = last_non_empty
table.rows[i][j]['is_inferred_merge'] = True
matrix[i][j] = last_non_empty # 更新矩阵
print(f"传播合并内容到位置 [{i},{j}]: {last_non_empty[:20]}...")
# 更新最后一个非空单元格
if current_text:
last_non_empty = current_text
last_non_empty_idx = i
except Exception as e:
print(f"处理垂直合并传播时出错 [{i},{j}]: {str(e)}")
# 第二轮:处理常见的分类表格模式(第一列相同值表示同一类别)
for j in range(min(2, cols)): # 只处理前两列
# 查找具有相同值的行组
groups = {}
for i in range(rows):
try:
if j < len(table.rows[i]):
value = table.rows[i][j].get('text', '').strip()
if value:
if value not in groups:
groups[value] = []
groups[value].append(i)
except Exception as e:
print(f"分组时出错 [{i},{j}]: {str(e)}")
# 处理每个组
for value, indices in groups.items():
if len(indices) >= 2: # 至少有两行具有相同值
# 检查这些行之间是否有空行
indices.sort()
for idx in range(len(indices) - 1):
start_row = indices[idx]
end_row = indices[idx + 1]
# 如果两行不相邻,检查中间行
if end_row - start_row > 1:
for mid_row in range(start_row + 1, end_row):
try:
# 检查中间行的单元格是否为空
if j < len(table.rows[mid_row]):
mid_cell = table.rows[mid_row][j]
if not mid_cell.get('text', '').strip():
# 这可能是被合并的单元格,填充内容
mid_cell['text'] = value
mid_cell['is_inferred_merge'] = True
print(f"填充中间行合并单元格 [{mid_row},{j}]: {value[:20]}...")
except Exception as e:
print(f"填充中间行时出错 [{mid_row},{j}]: {str(e)}")
def _enhance_vertical_merges(self, table: TableData):
"""
增强对垂直合并单元格的处理
处理逻辑包括:
1. 检查并处理第一列和第二列的特殊情况
2. 在表格中识别内容相似的单元格
Args:
table: TableData对象
"""
rows = len(table.rows)
cols = len(table.columns) if table.columns else 0
if rows <= 1 or cols == 0:
return
# 检查第一列和第二列的特殊情况
for j in range(min(2, cols)): # 检查前两列,因为合并单元格可能出现在这两列中
# 检查是否有垂直合并单元格
for i in range(1, rows):
try:
if j < len(table.rows[i]): # 确保索引有效
cell = table.rows[i][j]
# 如果单元格为空且没有标记为合并单元格,检查上面行的内容
if not cell.get('text', '').strip() and cell.get('vmerge') is None:
# 安全访问上一行
if j < len(table.rows[i-1]):
prev_cell = table.rows[i-1][j]
if prev_cell.get('text', '').strip():
# 如果上面行有内容,这可能是合并单元格
print(f"在位置 [{i},{j}] 检测到可能的垂直合并单元格")
# 将内容复制到当前单元格
cell['text'] = prev_cell['text']
cell['is_inferred_merge'] = True # 标记为推导出的合并单元格
except IndexError as e:
print(f"增强垂直合并处理索引错误 [{i},{j}]: {str(e)}")
except Exception as e:
print(f"增强垂直合并处理一般错误 [{i},{j}]: {str(e)}")
# 特殊情况:检查分类表格中的模式
try:
# 在分类表格中,同一列的内容如果重复出现,可能是合并单元格
content_groups = self._identify_content_groups(table, j)
# 处理内容相似的单元格
for group_indices in content_groups:
if len(group_indices) > 1: # 如果有多个相同的单元格
if group_indices[0] < len(table.rows) and j < len(table.rows[group_indices[0]]):
group_text = table.rows[group_indices[0]][j].get('text', '')
if group_text.strip(): # 如果单元格有内容
print(f"在列 {j} 中发现可能的内容合并组: {group_indices}")
# 将这些单元格标记为具有相同的内容
for idx in group_indices:
if idx < len(table.rows) and j < len(table.rows[idx]):
table.rows[idx][j]['content_group'] = group_indices
except Exception as e:
print(f"处理内容组时出错 [列 {j}]: {str(e)}")
def _identify_content_groups(self, table: TableData, col_idx: int) -> List[List[int]]:
"""
根据内容相似性识别合并单元格
Args:
table: TableData对象
col_idx: 要分析的列索引
Returns:
List[List[int]]: 可能合并单元格的行索引组
"""
rows = len(table.rows)
# 存储每个唯一内容的所有行索引
content_groups = {}
for i in range(rows):
try:
if col_idx < len(table.rows[i]):
cell_text = table.rows[i][col_idx].get('text', '').strip()
if cell_text:
if cell_text not in content_groups:
content_groups[cell_text] = []
content_groups[cell_text].append(i)
except IndexError:
# 安全跳过索引越界情况
continue
except Exception as e:
print(f"识别内容组时出错 [{i},{col_idx}]: {str(e)}")
# 返回包含多个行索引的组
return [indices for text, indices in content_groups.items() if len(indices) > 1]
def _is_valid_table(self, table: TableData) -> bool:
"""
检查表格是否有效(至少有一行一列且含有有意义的内容)
Args:
table: TableData对象
Returns:
bool: 表格是否有效
"""
try:
# 检查表格尺寸
rows = len(table.rows)
cols = len(table.columns) if table.columns else 0
# 如果没有行或列,表格无效
if rows < 1 or cols < 1:
print(f"表格无效: 没有行或列 (行数={rows}, 列数={cols})")
return False
# 检查表格XML结构是否包含表格标记
# 此步骤可以简单检测表格是否有表格相关的XML标记
try:
# 以下逻辑是为了特殊处理可能被误判的表格
# 判断是否是特殊表格(如药品分类表)
first_cell_text = ""
if rows > 0 and len(table.rows[0]) > 0:
first_cell_text = table.cell(0, 0).get('text', '').strip()
# 检查首行首列是否包含特定文本模式(如编号、分类名称等)
# 这些模式暗示这可能是一个重要表格
special_patterns = [
r'^\d{2}-\d{2}', # 类似 01-01 的编码
r'^[一二三四五六七八九十]+级', # 中文级别(一级、二级等)
r'^\d+\.\d+', # 类似 1.1 的编号格式
r'类[别型]|分类|编码', # 包含分类相关词汇
r'\s*\d+', # 表格编号(如"表1"
r'产品|器械|设备|材料' # 常见医疗或药品分类术语
]
for pattern in special_patterns:
if re.search(pattern, first_cell_text):
print(f"检测到特殊表格模式: '{first_cell_text}',强制视为有效表格")
return True
except Exception as e:
# 特殊检测失败,继续常规检测
print(f"特殊表格检测时出错: {str(e)}")
# 计算表格中的有效内容
total_cells = 0
non_empty_cells = 0
total_text_length = 0
for i in range(rows):
for j in range(min(cols, len(table.rows[i]))): # 防止越界
total_cells += 1
cell_text = table.cell(i, j)['text'].strip()
if cell_text:
non_empty_cells += 1
total_text_length += len(cell_text)
# 计算非空单元格比例
non_empty_ratio = non_empty_cells / total_cells if total_cells > 0 else 0
# 表格行列数检查 - 如果行数或列数足够多,更可能是有效表格
has_multiple_rows = rows >= 3
has_multiple_cols = cols >= 3
# 实际单元格内容检查
# 进一步放宽标准,只要有内容就视为可能有效
is_meaningful = (
# 1. 标准条件至少有2个单元格有内容
non_empty_cells >= 2 or
# 2. 极低门槛至少有1个单元格有内容且文本长度>=1个字符
(non_empty_cells > 0 and total_text_length >= 1) or
# 3. 表格足够大至少有3行3列可能是重要表格
(has_multiple_rows and has_multiple_cols) or
# 4. 非空率较高:即使单元格少,但如果填充率高,也可能是有意义的
(non_empty_ratio >= 0.5 and total_text_length > 0)
)
if not is_meaningful:
print(f"表格无效: 内容不足 (非空单元格={non_empty_cells}/{total_cells}, 文本长度={total_text_length})")
return is_meaningful
except Exception as e:
print(f"警告:检查表格有效性时出错: {str(e)}")
import traceback
traceback.print_exc()
# 出错时默认认为有效,避免丢失潜在有用的表格
return True
def _extract_plain_text_from_table(self, table: TableData) -> str:
"""
从表格中提取纯文本,用于将无效表格作为普通文本处理
Args:
table: docx表格对象
Returns:
str: 表格内容的纯文本表示
"""
try:
text_parts = []
for row in table.rows:
for cell in row:
cell_text = cell['text'].strip()
if cell_text:
text_parts.append(cell_text)
return " ".join(text_parts)
except Exception as e:
print(f"警告:从表格提取文本时出错: {str(e)}")
return "【表格文本提取失败】"
def _convert_table_to_text(self, table: TableData) -> str:
"""
将表格转换为文本格式,使用简化易读的表格表示
Args:
table: TableData对象
Returns:
str: 表格的文本表示
"""
try:
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns) if table.columns else 0
if rows == 0 or cols == 0:
return "【空表格】"
# 构建一个完整的表格矩阵,处理合并单元格
matrix = []
for i in range(rows):
row = [""] * cols
matrix.append(row)
# 首先安全地处理所有已知的单元格内容
for i in range(rows):
for j in range(cols):
try:
if j < len(table.rows[i]):
cell = table.rows[i][j]
text = cell.get('text', '').strip()
matrix[i][j] = text
except IndexError:
continue # 跳过索引越界
# 填充矩阵,处理合并单元格
for i in range(rows):
j = 0
while j < cols:
try:
if j >= len(table.rows[i]):
j += 1
continue
cell = table.rows[i][j]
text = cell.get('text', '').strip()
# 特殊处理:检查是否有内容组标记
content_group = cell.get('content_group', [])
if content_group:
# 如果这是内容组的一部分
if i in content_group and content_group[0] < len(table.rows) and j < len(table.rows[content_group[0]]):
group_text = table.rows[content_group[0]][j].get('text', '').strip()
if group_text:
text = group_text
# 处理水平合并(gridspan)
gridspan = cell.get('gridspan', 1)
# 处理垂直合并(vmerge)和推断的合并
if cell.get('vmerge') == 'continue' or cell.get('is_inferred_merge'):
# 如果是继续合并的单元格或推断的合并,使用当前已有的文本
if not text:
# 如果当前单元格文本为空,尝试从上面行查找
for prev_i in range(i-1, -1, -1):
if prev_i < len(table.rows) and j < len(table.rows[prev_i]):
prev_cell = table.rows[prev_i][j]
prev_text = prev_cell.get('text', '').strip()
if prev_text:
text = prev_text
break
# 填充当前单元格
matrix[i][j] = text
# 处理水平合并,将内容复制到被合并的单元格
for k in range(1, gridspan):
if j + k < cols:
matrix[i][j+k] = text
# 如果这是垂直合并的起始单元格,复制内容到下面被合并的单元格
if text and (cell.get('vmerge') == 'restart' or not cell.get('vmerge')):
for next_i in range(i+1, rows):
if next_i < len(table.rows) and j < len(table.rows[next_i]):
next_cell = table.rows[next_i][j]
if next_cell.get('vmerge') == 'continue' or not next_cell.get('text', '').strip():
# 复制到下面被合并的单元格
matrix[next_i][j] = text
# 处理水平合并
next_gridspan = next_cell.get('gridspan', 1)
for k in range(1, next_gridspan):
if j + k < cols:
matrix[next_i][j+k] = text
else:
break
j += max(1, gridspan)
except IndexError as e:
print(f"表格转文本处理索引错误 [{i},{j}]: {str(e)}")
j += 1 # 确保进度
except Exception as e:
print(f"表格转文本一般错误 [{i},{j}]: {str(e)}")
j += 1
# 再次处理第一列和第二列中的空白单元格 - 增强垂直合并处理
for j in range(min(3, cols)): # 扩展到前三列
# 自上而下扫描
last_content = ""
for i in range(rows):
if matrix[i][j]:
last_content = matrix[i][j]
elif last_content and i > 0 and matrix[i-1][j]:
# 如果当前为空且上一行不为空,填充内容
matrix[i][j] = last_content
# 自下而上扫描,填充孤立的空单元格
for i in range(rows-2, 0, -1): # 从倒数第二行开始向上
if not matrix[i][j] and matrix[i-1][j] and matrix[i+1][j] and matrix[i-1][j] == matrix[i+1][j]:
# 如果当前为空且上下行内容相同,填充内容
matrix[i][j] = matrix[i-1][j]
# 如果有表头,提取它们
headers = matrix[0] if rows > 0 else ["" + str(j+1) for j in range(cols)]
# 确保表头不为空
for j in range(cols):
if not headers[j]:
headers[j] = "" + str(j+1)
# 构建结构化输出 - 使用统一简化格式
result = []
result.append("表格内容(简化格式):")
# 添加表头行
header_line = []
# 计算每列最大宽度
col_widths = [0] * cols
for j in range(cols):
col_widths[j] = max(len(headers[j]), col_widths[j])
# 计算数据行的宽度
for i in range(1, rows):
for j in range(cols):
if matrix[i][j]:
col_widths[j] = max(col_widths[j], len(matrix[i][j]))
# 加入表头与分隔线
for j in range(cols):
header_line.append(headers[j].ljust(col_widths[j]))
result.append(" | ".join(header_line))
# 添加分隔线
separator = []
for j in range(cols):
separator.append("-" * col_widths[j])
result.append(" | ".join(separator))
# 添加数据行
for i in range(1, rows):
row_line = []
has_content = False
for j in range(cols):
cell_text = matrix[i][j]
if cell_text:
has_content = True
# 始终添加单元格内容,即使为空
row_line.append(cell_text.ljust(col_widths[j]))
if has_content:
result.append(" | ".join(row_line))
return "\n".join(result)
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
import traceback
traceback.print_exc()
return "【表格处理失败】"
def _convert_table_to_markdown(self, table: TableData) -> str:
"""
将表格转换为Markdown格式使用简化易读的表格表示
Args:
table: TableData对象
Returns:
str: 表格的Markdown表示
"""
try:
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns) if table.columns else 0
if rows == 0 or cols == 0:
return "| 空表格 |"
# 构建一个完整的表格矩阵,处理合并单元格
matrix = []
for i in range(rows):
row = [""] * cols
matrix.append(row)
# 首先安全地处理所有已知的单元格内容
for i in range(rows):
for j in range(cols):
try:
if j < len(table.rows[i]):
cell = table.rows[i][j]
text = cell.get('text', '').strip()
matrix[i][j] = text
except IndexError:
continue # 跳过索引越界
# 填充矩阵,处理合并单元格
for i in range(rows):
j = 0
while j < cols:
try:
if j >= len(table.rows[i]):
j += 1
continue
cell = table.rows[i][j]
text = cell.get('text', '').strip()
# 特殊处理:检查是否有内容组标记
content_group = cell.get('content_group', [])
if content_group and i in content_group:
# 如果这是内容组的一部分,保证内容的一致性
if content_group[0] < len(table.rows) and j < len(table.rows[content_group[0]]):
group_text = table.rows[content_group[0]][j].get('text', '').strip()
if group_text:
text = group_text
# 处理水平合并(gridspan)
gridspan = cell.get('gridspan', 1)
# 处理垂直合并(vmerge)和推断的合并
if cell.get('vmerge') == 'continue' or cell.get('is_inferred_merge'):
# 如果是继续合并的单元格或推断的合并,使用当前已有的文本
if not text:
# 如果当前单元格文本为空,尝试从上面行查找
for prev_i in range(i-1, -1, -1):
if prev_i < len(table.rows) and j < len(table.rows[prev_i]):
prev_cell = table.rows[prev_i][j]
prev_text = prev_cell.get('text', '').strip()
if prev_text:
text = prev_text
break
# 填充当前单元格
matrix[i][j] = text
# 处理水平合并,将内容复制到被合并的单元格
for k in range(1, gridspan):
if j + k < cols:
matrix[i][j+k] = text
# 如果这是垂直合并的起始单元格,复制内容到下面被合并的单元格
if text and (cell.get('vmerge') == 'restart' or not cell.get('vmerge')):
for next_i in range(i+1, rows):
if next_i < len(table.rows) and j < len(table.rows[next_i]):
next_cell = table.rows[next_i][j]
if next_cell.get('vmerge') == 'continue' or not next_cell.get('text', '').strip():
# 复制到下面被合并的单元格
matrix[next_i][j] = text
# 处理水平合并
next_gridspan = next_cell.get('gridspan', 1)
for k in range(1, next_gridspan):
if j + k < cols:
matrix[next_i][j+k] = text
else:
break
j += max(1, gridspan)
except Exception as e:
print(f"Markdown表格处理错误 [{i},{j}]: {str(e)}")
j += 1
# 再次处理第一列中的空白单元格 - 增强垂直合并处理
for j in range(min(3, cols)): # 扩展到前三列
# 自上而下扫描
last_content = ""
for i in range(rows):
if matrix[i][j]:
last_content = matrix[i][j]
elif last_content and i > 0 and matrix[i-1][j]:
# 如果当前为空且上一行不为空,填充内容
matrix[i][j] = last_content
# 确保表头不为空
headers = matrix[0] if rows > 0 else []
for j in range(cols):
if j >= len(headers) or not headers[j]:
headers.append("" + str(j+1))
# 构建Markdown表格
markdown_rows = []
# 添加表头行
header_row = "| " + " | ".join(headers) + " |"
markdown_rows.append(header_row)
# 添加分隔行
separator = "| " + " | ".join(["---"] * cols) + " |"
markdown_rows.append(separator)
# 添加数据行
for i in range(1, rows):
row_data = []
has_content = False
for j in range(cols):
cell_text = matrix[i][j]
if cell_text:
has_content = True
row_data.append(cell_text)
if has_content:
markdown_rows.append("| " + " | ".join(row_data) + " |")
return "\n".join(markdown_rows)
except Exception as e:
print(f"警告处理Markdown表格时出错: {str(e)}")
import traceback
traceback.print_exc()
return "| 表格处理失败 |"
def _extract_table_text(self, table: TableData) -> str:
"""
提取表格中的文本内容,返回格式化的文本表示
Args:
table: docx表格对象
Returns:
str: 表格内容的文本表示
"""
# 调用优化后的表格处理函数,确保合并单元格被正确处理
return self._convert_table_to_text(table)
def _convert_small_table_to_text(self, table: TableData) -> str:
"""
将小型表格转换为更简洁的文本格式
Args:
table: TableData对象
Returns:
str: 表格的文本表示
"""
rows = len(table.rows)
cols = len(table.columns) if table.columns else 0
if rows == 0 or cols == 0:
return "【空表格】"
# 提取所有单元格文本
cell_texts = []
for i in range(rows):
row_texts = []
for j in range(min(cols, len(table.rows[i]))):
cell_text = table.cell(i, j)['text'].strip().replace('\n', ' ')
row_texts.append(cell_text)
cell_texts.append(row_texts)
# 计算每列的最大宽度
col_widths = [0] * cols
for i in range(rows):
for j in range(len(cell_texts[i])):
col_widths[j] = max(col_widths[j], len(cell_texts[i][j]))
# 生成表格文本
result = []
# 添加表头
header_row = cell_texts[0]
header_line = []
for j, text in enumerate(header_row):
width = min(col_widths[j], 30) # 限制最大宽度
header_line.append(text.ljust(width))
result.append(" | ".join(header_line))
# 添加分隔线
separator = []
for j in range(cols):
width = min(col_widths[j], 30)
separator.append("-" * width)
result.append(" | ".join(separator))
# 添加数据行
for i in range(1, rows):
row_line = []
for j, text in enumerate(cell_texts[i]):
width = min(col_widths[j], 30) # 限制最大宽度
row_line.append(text.ljust(width))
result.append(" | ".join(row_line))
return "\n".join(result)
def _get_vmerge_value(self, cell_element) -> str:
"""
获取单元格的垂直合并属性
Args:
cell_element: 单元格元素
Returns:
str: 垂直合并属性值
"""
vmerge = cell_element.xpath('.//w:vMerge')
if vmerge:
return vmerge[0].get(self._qn('w:val'), 'continue')
return None
def _get_gridspan_value(self, cell_element) -> int:
"""
获取单元格的水平合并数量
Args:
cell_element: 单元格元素
Returns:
int: 水平合并的列数
"""
try:
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan and gridspan[0].get(self._qn('w:val')):
return int(gridspan[0].get(self._qn('w:val')))
except (ValueError, TypeError, AttributeError) as e:
print(f"警告获取gridspan值时出错: {str(e)}")
return 1 # 默认返回1表示没有合并
def _get_vertical_span(self, table: TableData, start_row: int, col: int) -> int:
"""
计算垂直合并的行数
Args:
table: 表格对象
start_row: 起始行
col: 列号
Returns:
int: 垂直合并的行数
"""
span = 1
for i in range(start_row + 1, len(table.rows)):
cell = table.cell(i, col)
if cell.get('vmerge') == 'continue':
span += 1
else:
break
return span
def _qn(self, tag: str) -> str:
"""
将标签转换为带命名空间的格式
Args:
tag: 原始标签
Returns:
str: 带命名空间的标签
"""
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
return prefix + tag