表格处理优化

This commit is contained in:
cxs 2025-04-23 11:19:10 +08:00
parent 4ddbd7d510
commit 1ee4ed24d3
2 changed files with 413 additions and 60 deletions

113
README.md
View File

@ -13,6 +13,8 @@
- 支持doc格式自动转换为docx
- 保持原始文档格式统一输出docx格式
- 完整保留表格内容及格式
- 支持表格转换为格式化文本,便于大模型识别
- 同时输出docx和txt格式文件txt文件包含完整的文本内容和表格的文本表示
## 系统要求
@ -54,11 +56,21 @@ python doc_cleaner.py ./input_docs
## 输出说明
程序会为每个处理的文档生成一个清理后的docx文件:
程序会为每个处理的文档生成两个文件:
- `文档名_cleaned.docx`: 包含清理后的正文内容和附录(如果存在)
- 附录内容会自动添加分页符并在新页面开始
- 所有文件包括原始doc格式都会统一转换并保存为docx格式
- 保持文档格式为docx支持段落对齐等基本格式
- 附录内容会自动添加分页符并在新页面开始
- 所有文件包括原始doc格式都会统一转换并保存为docx格式
- 保持文档格式为docx支持段落对齐等基本格式
- 表格以文本格式显示使用ASCII字符绘制边框和分隔符
- 使用等宽字体Courier New确保表格对齐
- 自动调整列宽以适应内容
- 清晰标识表格序号和位置(正文/附录)
- `文档名_cleaned.txt`: 包含所有文本内容的纯文本文件
- 包含完整的正文内容
- 包含所有表格的文本表示使用ASCII字符绘制
- 保持文档的原始结构(正文、表格、附录的顺序)
- 使用空行分隔不同部分
- 清晰标注表格的序号和位置
## 注意事项
@ -120,6 +132,99 @@ python doc_cleaner.py ./input_docs
- 改用文件后缀名直接判断文件类型
- 简化了文件类型检测逻辑
### 2024-03-23
- 优化表格处理方式
- 将表格直接转换为文本格式显示在文档中
- 使用ASCII字符绘制表格边框和分隔符
- 采用等宽字体确保表格对齐
- 自动调整列宽以适应内容
- 优化了表格内容的文本对齐方式
- 保留表头样式,便于识别表格结构
- 清晰标识表格序号和位置(正文/附录)
- 移除了原始表格格式,提高可读性和兼容性
### 2024-03-24
- 新增文本文件输出功能
- 同时生成docx和txt格式的输出文件
- txt文件包含完整的文本内容和表格的文本表示
- 保持文档结构和内容的完整性
- 优化了表格在文本文件中的显示效果
### 2024-03-25
- 优化文本输出格式
- 移除文本输出中的换行符
- 使用空格分隔文本内容
- 提高文本的连续性和可读性
### 2024-03-26
- 优化表格文本格式
- 移除ASCII边框字符+和-
- 使用方括号[]标识单元格边界
- 避免JSON解析错误
- 简化表格文本表示方式
### 2024-03-27
- 进一步优化文本输出格式
- 移除所有换行符和多余空格
- 所有内容在单行内显示
- 优化表格文本的连续性
- 确保输出文本的JSON兼容性
### 2024-03-28
- 改进表格输出格式
- 采用表单形式输出表格内容
- 使用"键:值"格式表示每个单元格
- 按行组织表格数据
- 使用表头作为数据项的键名
- 提高表格数据的可读性和结构性
### 2024-03-29
- 完善表格输出格式
- 添加表头行作为第一行输出
- 确保每行数据包含所有字段
- 保持字段顺序与表头一致
- 空值字段显示为空字符串
- 提升数据的完整性和一致性
### 2024-03-30
- 增强复杂表格处理能力
- 支持多级表头的识别和处理
- 正确处理水平和垂直合并单元格
- 智能识别表头行数最多支持3行表头
- 优化表头和数据的关联方式
- 提供更清晰的数据结构表示
- 完善空值和特殊情况的处理
- 确保合并单元格数据的完整性
### 2024-03-31
- 优化表格处理错误处理机制
- 增加多层错误处理,确保程序稳定性
- 改进合并单元格处理逻辑
- 优化空值和异常值的处理方式
- 提供更详细的错误位置信息
- 确保部分错误不影响整体处理
- 完善日志输出,便于问题定位
### 2024-04-01
- 深度优化复杂表格处理
- 改进多级表头的识别和处理算法
- 优化表头层级结构的分析方法
- 增强跨行跨列单元格的处理能力
- 改进表头标识符的构建逻辑
- 提升数据与表头的关联准确性
- 支持更复杂的表格结构解析
- 完善异常情况的处理机制
### 2024-04-02
- 优化表头处理逻辑
- 消除表头中的重复前缀
- 优化多级表头的组合方式
- 改进跨列表头的处理机制
- 增加表头去重功能
- 保持表头层级的清晰性
- 提高表格数据的可读性
- 减少冗余信息
## 功能特性
- 支持doc和docx格式的文档处理

View File

@ -419,7 +419,7 @@ class DocCleaner:
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], output_path: str):
"""
将清理后的内容保存为docx格式
将清理后的内容保存为docx格式和txt格式
Args:
cleaned_content: 清理后的正文段落列表
@ -435,6 +435,9 @@ class DocCleaner:
# 创建新文档
doc = docx.Document()
# 创建文本输出内容列表
text_output = []
# 添加正文内容和表格,保持它们的相对位置
print("\n处理正文内容...")
@ -451,31 +454,39 @@ class DocCleaner:
if table_index < len(tables):
table = tables[table_index]
try:
# 直接在XML级别复制表格
new_tbl = deepcopy(table._element)
# 确保新表格有正确的命名空间
new_tbl.tbl = parse_xml(new_tbl.xml)
elements_to_insert.append(('table', new_tbl))
print(f"准备插入表格 {table_index} 在位置 {i}")
# 添加表格后的空行
# 转换表格为文本格式
table_text = self._convert_table_to_text(table)
# 添加表格标题
title = doc.add_paragraph(f"表格 {table_index + 1}:")
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
elements_to_insert.append(('paragraph', title._element))
# 添加表格文本内容,使用等宽字体
p = doc.add_paragraph()
run = p.add_run(table_text)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
elements_to_insert.append(('paragraph', p._element))
# 添加空行
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
# 添加到文本输出
text_output.append(f"表格 {table_index + 1}:")
text_output.append(table_text)
except Exception as e:
print(f"警告:复制表格时出错: {str(e)}")
try:
print("尝试使用备用方法...")
p = doc.add_paragraph()
self._copy_table_fallback(p._parent, table)
elements_to_insert.append(('paragraph', p._element))
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
print("备用方法成功")
except Exception as e2:
print(f"警告:备用方法也失败: {str(e2)}")
elements_to_insert.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
print(f"警告:处理表格时出错: {str(e)}")
elements_to_insert.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
text_output.append("【表格处理失败】")
else:
# 添加普通段落
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
elements_to_insert.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(content)
except Exception as e:
print(f"警告:处理段落或表格时出错: {str(e)}")
continue
@ -495,6 +506,9 @@ class DocCleaner:
title = doc.add_paragraph("附录")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加到文本输出
text_output.append("附录")
# 添加附录内容
appendix_elements = []
for content in appendix:
@ -506,27 +520,35 @@ class DocCleaner:
if table_index < len(tables):
table = tables[table_index]
try:
# 直接在XML级别复制表格
new_tbl = deepcopy(table._element)
new_tbl.tbl = parse_xml(new_tbl.xml)
appendix_elements.append(('table', new_tbl))
print(f"准备插入附录表格 {table_index}")
appendix_elements.append(('paragraph', doc.add_paragraph()._element))
# 转换表格为文本格式
table_text = self._convert_table_to_text(table)
# 添加表格标题
title = doc.add_paragraph(f"附录表格 {table_index + 1}:")
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
appendix_elements.append(('paragraph', title._element))
# 添加表格文本内容,使用等宽字体
p = doc.add_paragraph()
run = p.add_run(table_text)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
appendix_elements.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(f"附录表格 {table_index + 1}:")
text_output.append(table_text)
except Exception as e:
print(f"警告:复制附录中的表格时出错: {str(e)}")
try:
p = doc.add_paragraph()
self._copy_table_fallback(p._parent, table)
appendix_elements.append(('paragraph', p._element))
appendix_elements.append(('paragraph', doc.add_paragraph()._element))
print("备用方法成功")
except Exception as e2:
print(f"警告:附录表格的备用方法也失败: {str(e2)}")
appendix_elements.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
print(f"警告:处理附录表格时出错: {str(e)}")
appendix_elements.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
text_output.append("【表格处理失败】")
else:
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
appendix_elements.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(content)
# 按顺序将附录元素插入文档
for element_type, element in appendix_elements:
@ -535,12 +557,24 @@ class DocCleaner:
except Exception as e:
print(f"警告:处理附录时出错: {str(e)}")
# 保存文档
# 保存docx文档
try:
doc.save(output_path)
print("\n文档保存成功!")
print("\nWord文档保存成功!")
except Exception as e:
print(f"错误:保存文档时出错: {str(e)}")
print(f"错误保存Word文档时出错: {str(e)}")
raise
# 保存文本文件
try:
text_file_path = os.path.splitext(output_path)[0] + '.txt'
# 移除所有换行符并用空格连接
text_content = ' '.join([t.replace('\n', ' ').strip() for t in text_output if t.strip()])
with open(text_file_path, 'w', encoding='utf-8') as f:
f.write(text_content)
print(f"文本文件保存成功: {text_file_path}")
except Exception as e:
print(f"错误:保存文本文件时出错: {str(e)}")
raise
def _copy_table_fallback(self, doc: docx.Document, table: Table):
@ -665,10 +699,13 @@ class DocCleaner:
Returns:
int: 水平合并的列数
"""
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan:
return int(gridspan[0].get(qn('w:val'), '1'))
return 1
try:
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan and gridspan[0].get(qn('w:val')):
return int(gridspan[0].get(qn('w:val')))
except (ValueError, TypeError, AttributeError) as e:
print(f"警告获取gridspan值时出错: {str(e)}")
return 1 # 默认返回1表示没有合并
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
"""
@ -691,9 +728,226 @@ class DocCleaner:
break
return span
def _convert_table_to_text(self, table: Table) -> str:
"""
将表格转换为文本格式智能处理简单和复杂表格结构
Args:
table: docx表格对象
Returns:
str: 表格的文本表示
"""
try:
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
if rows == 0 or cols == 0:
return "【空表格】"
# 存储处理后的表格数据
processed_data = []
# 检查是否是复杂表格(具有合并单元格或多级表头)
is_complex_table = False
max_header_rows = min(3, rows) # 最多检查前3行
# 检查前几行是否存在合并单元格
for i in range(max_header_rows):
for j in range(cols):
try:
cell = table.cell(i, j)
if cell._element.tcPr is not None:
# 检查垂直合并
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
is_complex_table = True
break
# 检查水平合并
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
is_complex_table = True
break
except Exception:
continue
if is_complex_table:
break
if is_complex_table:
# 使用复杂表格处理逻辑
# 第一步:分析表头结构
header_structure = [] # 存储表头的层级结构
# 分析每一列的表头结构
for j in range(cols):
column_headers = []
last_header = None
for i in range(max_header_rows):
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 检查垂直合并
if cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'continue':
# 使用上一个非空表头
if last_header:
text = last_header
# 检查水平合并
if cell._element.tcPr is not None:
gridspan = self._get_gridspan_value(cell._element)
if gridspan > 1:
# 标记这是一个跨列的表头
text = f"SPAN_{gridspan}_{text}"
if text:
column_headers.append(text)
last_header = text
except Exception as e:
print(f"警告:分析表头单元格 [{i},{j}] 时出错: {str(e)}")
continue
header_structure.append(column_headers)
# 第二步:构建完整的表头标识符
full_headers = []
for j, headers in enumerate(header_structure):
if not headers:
full_headers.append(f"{j+1}")
continue
# 处理跨列的表头
header_text = []
current_prefix = ""
for h in headers:
if h.startswith('SPAN_'):
parts = h.split('_', 2)
span = int(parts[1])
text = parts[2]
# 将跨列的表头添加到后续的列
for k in range(span):
if j + k < cols:
if k == 0:
if text != current_prefix: # 避免重复前缀
header_text.append(text)
current_prefix = text
else:
if text not in header_structure[j + k]:
header_structure[j + k].insert(0, text)
else:
if h != current_prefix: # 避免重复前缀
header_text.append(h)
current_prefix = h
# 移除重复的表头部分
unique_headers = []
seen = set()
for h in header_text:
if h not in seen:
unique_headers.append(h)
seen.add(h)
full_headers.append('_'.join(unique_headers))
# 确定实际的表头行数
header_row_count = max(len(headers) for headers in header_structure)
if header_row_count == 0:
header_row_count = 1
# 处理数据行
for i in range(header_row_count, rows):
try:
row_data = []
j = 0
while j < cols:
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 处理垂直合并
if not text and cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge and vmerge[0].get(qn('w:val')) == 'continue':
# 使用上一行的值
text = table.cell(i-1, j).text.strip()
# 处理水平合并
gridspan = self._get_gridspan_value(cell._element)
# 将值复制到所有合并的列
for k in range(gridspan):
if j + k < len(full_headers):
row_data.append(f"{full_headers[j+k]}:{text}")
j += gridspan
except Exception as e:
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
if j < len(full_headers):
row_data.append(f"{full_headers[j]}:")
j += 1
# 确保行中至少有一个非空值
if any(data.split(':')[1].strip() for data in row_data):
processed_data.append(" ".join(row_data))
except Exception as e:
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
continue
else:
# 使用简单表格处理逻辑
# 获取表头
headers = []
for j in range(cols):
try:
header_text = table.cell(0, j).text.strip()
if not header_text: # 如果表头为空,使用默认值
header_text = f"{j+1}"
headers.append(header_text)
except Exception as e:
print(f"警告:处理表头单元格 [0,{j}] 时出错: {str(e)}")
headers.append(f"{j+1}")
# 处理数据行
for i in range(1, rows):
try:
row_data = []
for j in range(cols):
try:
text = table.cell(i, j).text.strip()
row_data.append(f"{headers[j]}:{text}")
except Exception as e:
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
row_data.append(f"{headers[j]}:")
# 确保行中至少有一个非空值
if any(data.split(':')[1].strip() for data in row_data):
processed_data.append(" ".join(row_data))
except Exception as e:
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
continue
# 返回处理后的表格文本
if processed_data:
return " ".join(processed_data)
else:
return "【表格无有效数据】"
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
return "【表格处理失败】"
def _extract_table_text(self, table: Table) -> str:
"""
提取表格中的文本内容
提取表格中的文本内容现在会返回格式化的文本表示
Args:
table: docx表格对象
@ -701,13 +955,7 @@ class DocCleaner:
Returns:
str: 表格内容的文本表示
"""
table_text = []
for row in table.rows:
for cell in row.cells:
cell_text = cell.text.strip()
if cell_text:
table_text.append(cell_text)
return ' '.join(table_text)
return self._convert_table_to_text(table)
def process_directory(input_dir: str, output_dir: str = None):
"""
@ -766,10 +1014,10 @@ def qn(tag: str) -> str:
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='文档清理工具')
parser.add_argument('input_dir', help='输入目录路径')
parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None)
# parser = argparse.ArgumentParser(description='文档清理工具')
# parser.add_argument('input_dir', help='输入目录路径')
# parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None)
#
# args = parser.parse_args()
args = parser.parse_args()
process_directory(args.input_dir, args.output_dir)
process_directory("D:/rzData/poject/AI项目/中烟/后台服务/es数据/数据验证", "D:/rzData/poject/AI项目/中烟/后台服务/es数据/数据验证")