This commit is contained in:
方崇德 2025-04-16 16:51:10 +08:00
parent ec6af93437
commit f80a2ffef2
3 changed files with 513 additions and 64 deletions

View File

@ -12,6 +12,7 @@
- 自动跳过图片内容 - 自动跳过图片内容
- 支持doc格式自动转换为docx - 支持doc格式自动转换为docx
- 保持原始文档格式统一输出docx格式 - 保持原始文档格式统一输出docx格式
- 完整保留表格内容及格式
## 系统要求 ## 系统要求
@ -42,13 +43,13 @@ pip install -r requirements.txt
## 使用方法 ## 使用方法
```bash ```bash
python doc_cleaner.py 输入目录 输出目录 python doc_cleaner.py 输入目录
``` ```
### 示例 ### 示例
```bash ```bash
python doc_cleaner.py ./input_docs ./cleaned_docs python doc_cleaner.py ./input_docs
``` ```
## 输出说明 ## 输出说明
@ -82,3 +83,42 @@ python doc_cleaner.py ./input_docs ./cleaned_docs
- `^参考文献$` - `^参考文献$`
- `^References$` - `^References$`
- `^Bibliography$` - `^Bibliography$`
## 版本历史
### v1.1.0 (2024-01-09)
- 新增完整的表格支持
- 保留表格原始格式和样式
- 优化文档处理流程
### v1.0.0
- 初始版本发布
- 基础文档清理功能
## 更新日志
### 2024-03-21
- 修复了表格位置错误的问题
- 改进了表格占位符的处理机制
- 实现了基于索引的精确表格定位
- 确保表格按原文档位置正确插入
- 重构了文档处理核心逻辑
- 改进了文档元素的解析和存储方式
- 优化了正文和附录的分离逻辑
- 加强了表格位置的追踪机制
- 简化了文档结构处理流程
### 2024-03-xx
- 修复了表格在清理过程中位置错位的问题
- 改进了文本清理逻辑,确保表格占位符不被清理
- 优化了去重算法,保持表格在文档中的原始位置
- 分离表格和文本内容的处理流程,避免交叉影响
## 功能特性
- 支持doc和docx格式的文档处理
- 清理文档中的页眉页脚
- 保留文档中的表格并维持其原始位置
- 支持附录的单独处理
- 文本去重功能
- 批量处理目录下的所有文档

View File

@ -12,9 +12,15 @@ from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Tuple, Dict, Optional from typing import List, Tuple, Dict, Optional
from docx.shared import Pt from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.enum.table import WD_TABLE_ALIGNMENT
import subprocess import subprocess
import tempfile import tempfile
import json import json
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from copy import deepcopy
from docx.oxml import parse_xml
from docx.oxml.ns import nsdecls
class DocCleaner: class DocCleaner:
def __init__(self, ollama_host: str = "http://192.168.1.18:11434"): def __init__(self, ollama_host: str = "http://192.168.1.18:11434"):
@ -81,16 +87,18 @@ class DocCleaner:
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise Exception(f"转换doc文件失败: {str(e)}") raise Exception(f"转换doc文件失败: {str(e)}")
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str]]: def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table]]:
""" """
清理文档并返回处理后的正文和附录 清理文档并返回处理后的正文附录和表格
Args: Args:
file_path: 文档文件路径 file_path: 文档文件路径
Returns: Returns:
Tuple[List[str], List[str]]: (清理后的正文段落列表, 附录段落列表) Tuple[List[str], List[str], List[Table]]: (清理后的正文段落列表, 附录段落列表, 表格列表)
""" """
print(f"\n开始处理文档: {file_path}")
# 检测文件类型 # 检测文件类型
file_type = magic.from_file(file_path, mime=True) file_type = magic.from_file(file_path, mime=True)
@ -104,19 +112,109 @@ class DocCleaner:
else: else:
doc = docx.Document(file_path) doc = docx.Document(file_path)
# 提取所有段落文本 # 提取所有内容(段落和表格)
paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()] content = []
tables = []
table_count = 0
try:
print("\n开始解析文档结构...")
# 遍历文档体中的所有元素
for element in doc._element.body:
if element.tag.endswith('p'):
try:
paragraph = docx.text.paragraph.Paragraph(element, doc)
text = paragraph.text.strip()
# 只添加非空段落
if text:
# 检查是否是附录标题
is_appendix = any(re.match(pattern, text, re.IGNORECASE)
for pattern in self.appendix_patterns)
content.append({
'type': 'paragraph',
'content': text,
'is_appendix_start': is_appendix
})
if is_appendix:
print(f"发现附录标题: {text}")
except Exception as e:
print(f"警告:处理段落时出错: {str(e)}")
continue
elif element.tag.endswith('tbl'):
try:
table = docx.table.Table(element, doc)
# 验证表格是否有效
if hasattr(table, 'rows') and hasattr(table, 'columns'):
tables.append(table)
content.append({
'type': 'table',
'index': table_count
})
print(f"发现表格 {table_count}: {len(table.rows)}行 x {len(table.columns)}")
table_count += 1
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
continue
except Exception as e:
print(f"警告:遍历文档内容时出错: {str(e)}")
print(f"\n文档结构解析完成:")
print(f"- 总元素数: {len(content)}")
print(f"- 表格数量: {len(tables)}")
# 分离正文和附录 # 分离正文和附录
main_content, appendix = self._split_content(paragraphs) main_content = []
appendix = []
is_appendix = False
# 清理正文 print("\n开始分离正文和附录...")
cleaned_content = self._clean_text(main_content) for item in content:
if item['type'] == 'paragraph':
if item['is_appendix_start']:
is_appendix = True
print("进入附录部分")
# 删除重复段落 if is_appendix:
#cleaned_content = self._remove_duplicates(cleaned_content) appendix.append(item['content'])
else:
main_content.append(item['content'])
return cleaned_content, appendix elif item['type'] == 'table':
table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}'
if is_appendix:
appendix.append(table_placeholder)
print(f"添加表格到附录: {table_placeholder}")
else:
main_content.append(table_placeholder)
print(f"添加表格到正文: {table_placeholder}")
print(f"\n分离完成:")
print(f"- 正文元素数: {len(main_content)}")
print(f"- 附录元素数: {len(appendix)}")
# 清理正文(保留表格标记)
cleaned_content = []
print("\n开始清理正文...")
for item in main_content:
if item.startswith('TABLE_PLACEHOLDER_'):
cleaned_content.append(item)
print(f"保留表格标记: {item}")
else:
cleaned_text = self._clean_text([item])[0]
if cleaned_text:
cleaned_content.append(cleaned_text)
print(f"\n清理完成:")
print(f"- 清理后元素数: {len(cleaned_content)}")
print("- 表格标记位置:")
for i, item in enumerate(cleaned_content):
if item.startswith('TABLE_PLACEHOLDER_'):
print(f" 位置 {i}: {item}")
return cleaned_content, appendix, tables
def _clean_text(self, text: List[str]) -> List[str]: def _clean_text(self, text: List[str]) -> List[str]:
""" """
@ -130,6 +228,11 @@ class DocCleaner:
""" """
cleaned = [] cleaned = []
for paragraph in text: for paragraph in text:
# 如果是表格标记,直接保留
if paragraph.startswith('TABLE_PLACEHOLDER_'):
cleaned.append(paragraph)
continue
# 跳过空段落 # 跳过空段落
if not paragraph.strip(): if not paragraph.strip():
continue continue
@ -211,7 +314,7 @@ class DocCleaner:
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]: def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
""" """
删除重复段落 删除重复段落保持表格占位符的位置不变
Args: Args:
paragraphs: 段落列表 paragraphs: 段落列表
@ -223,22 +326,42 @@ class DocCleaner:
if not paragraphs: if not paragraphs:
return [] return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
try: try:
# 只对非表格段落进行去重
if text_paragraphs:
# 获取文本嵌入 # 获取文本嵌入
embeddings = self._get_embeddings(paragraphs) text_only = [p[1] for p in text_paragraphs]
embeddings = self._get_embeddings(text_only)
# 计算余弦相似度矩阵 # 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(embeddings) similarity_matrix = cosine_similarity(embeddings)
# 标记要保留的段落 # 标记要保留的段落
keep_indices = [] keep_indices = []
for i in range(len(paragraphs)): for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留 # 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices): if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i) keep_indices.append(i)
# 返回去重后的段落 # 保留的非表格段落
return [paragraphs[i] for i in keep_indices] kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
except Exception as e: except Exception as e:
print(f"使用Ollama嵌入模型失败回退到TF-IDF方法: {str(e)}") print(f"使用Ollama嵌入模型失败回退到TF-IDF方法: {str(e)}")
@ -259,42 +382,112 @@ class DocCleaner:
if not paragraphs: if not paragraphs:
return [] return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
if text_paragraphs:
# 计算TF-IDF矩阵 # 计算TF-IDF矩阵
tfidf_matrix = self.vectorizer.fit_transform(paragraphs) text_only = [p[1] for p in text_paragraphs]
tfidf_matrix = self.vectorizer.fit_transform(text_only)
# 计算余弦相似度矩阵 # 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(tfidf_matrix) similarity_matrix = cosine_similarity(tfidf_matrix)
# 标记要保留的段落 # 标记要保留的段落
keep_indices = [] keep_indices = []
for i in range(len(paragraphs)): for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留 # 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices): if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i) keep_indices.append(i)
# 返回去重后的段落 # 保留的非表格段落
return [paragraphs[i] for i in keep_indices] kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], output_path: str): # 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], output_path: str):
""" """
将清理后的内容保存为docx格式 将清理后的内容保存为docx格式
Args: Args:
cleaned_content: 清理后的正文段落列表 cleaned_content: 清理后的正文段落列表
appendix: 附录段落列表 appendix: 附录段落列表
tables: 表格列表
output_path: 输出文件路径 output_path: 输出文件路径
""" """
print(f"\n开始保存文档: {output_path}")
print(f"- 正文元素数: {len(cleaned_content)}")
print(f"- 附录元素数: {len(appendix)}")
print(f"- 表格总数: {len(tables)}")
# 创建新文档 # 创建新文档
doc = docx.Document() doc = docx.Document()
# 添加正文内容 # 添加正文内容和表格,保持它们的相对位置
for paragraph in cleaned_content: print("\n处理正文内容...")
p = doc.add_paragraph(paragraph)
# 设置段落格式(可以根据需要调整) # 创建一个列表来存储所有要插入的元素
elements_to_insert = []
for i, content in enumerate(cleaned_content):
try:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 直接在XML级别复制表格
new_tbl = deepcopy(table._element)
# 确保新表格有正确的命名空间
new_tbl.tbl = parse_xml(new_tbl.xml)
elements_to_insert.append(('table', new_tbl))
print(f"准备插入表格 {table_index} 在位置 {i}")
# 添加表格后的空行
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
except Exception as e:
print(f"警告:复制表格时出错: {str(e)}")
try:
print("尝试使用备用方法...")
p = doc.add_paragraph()
self._copy_table_fallback(p._parent, table)
elements_to_insert.append(('paragraph', p._element))
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
print("备用方法成功")
except Exception as e2:
print(f"警告:备用方法也失败: {str(e2)}")
elements_to_insert.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
else:
# 添加普通段落
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
elements_to_insert.append(('paragraph', p._element))
except Exception as e:
print(f"警告:处理段落或表格时出错: {str(e)}")
continue
# 按顺序将所有元素插入文档
for element_type, element in elements_to_insert:
doc._body._element.append(element)
# 如果有附录,添加分隔符和附录内容 # 如果有附录,添加分隔符和附录内容
if appendix: if appendix:
print("\n处理附录内容...")
try:
# 添加分页符 # 添加分页符
doc.add_page_break() doc.add_page_break()
@ -303,12 +496,218 @@ class DocCleaner:
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加附录内容 # 添加附录内容
for paragraph in appendix: appendix_elements = []
p = doc.add_paragraph(paragraph) for content in appendix:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 直接在XML级别复制表格
new_tbl = deepcopy(table._element)
new_tbl.tbl = parse_xml(new_tbl.xml)
appendix_elements.append(('table', new_tbl))
print(f"准备插入附录表格 {table_index}")
appendix_elements.append(('paragraph', doc.add_paragraph()._element))
except Exception as e:
print(f"警告:复制附录中的表格时出错: {str(e)}")
try:
p = doc.add_paragraph()
self._copy_table_fallback(p._parent, table)
appendix_elements.append(('paragraph', p._element))
appendix_elements.append(('paragraph', doc.add_paragraph()._element))
print("备用方法成功")
except Exception as e2:
print(f"警告:附录表格的备用方法也失败: {str(e2)}")
appendix_elements.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
else:
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
appendix_elements.append(('paragraph', p._element))
# 按顺序将附录元素插入文档
for element_type, element in appendix_elements:
doc._body._element.append(element)
except Exception as e:
print(f"警告:处理附录时出错: {str(e)}")
# 保存文档 # 保存文档
try:
doc.save(output_path) doc.save(output_path)
print("\n文档保存成功!")
except Exception as e:
print(f"错误:保存文档时出错: {str(e)}")
raise
def _copy_table_fallback(self, doc: docx.Document, table: Table):
"""
表格复制的备用方法
Args:
doc: 目标文档
table: 源表格
"""
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
# 创建新表格
new_table = doc.add_table(rows=rows, cols=cols)
# 复制表格样式
if table.style:
new_table.style = table.style
# 复制表格属性
new_table._element.tblPr = deepcopy(table._element.tblPr)
# 复制网格信息
new_table._element.tblGrid = deepcopy(table._element.tblGrid)
# 创建单元格映射以跟踪合并
cell_map = {}
# 第一遍:标记合并的单元格
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
# 检查是否是合并单元格的一部分
if src_cell._element.tcPr is not None:
# 检查垂直合并
vmerge = src_cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'restart':
# 这是合并的起始单元格
span = self._get_vertical_span(table, i, j)
cell_map[(i, j)] = ('vmerge', span)
# 检查水平合并
gridspan = src_cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
span = int(gridspan[0].get(qn('w:val')))
if span > 1:
cell_map[(i, j)] = ('hmerge', span)
except Exception as e:
print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}")
# 第二遍:复制内容并执行合并
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
dst_cell = new_table.cell(i, j)
# 检查是否需要合并
if (i, j) in cell_map:
merge_type, span = cell_map[(i, j)]
if merge_type == 'vmerge':
# 垂直合并
for k in range(1, span):
if i + k < rows:
dst_cell.merge(new_table.cell(i + k, j))
elif merge_type == 'hmerge':
# 水平合并
for k in range(1, span):
if j + k < cols:
dst_cell.merge(new_table.cell(i, j + k))
# 复制单元格属性
if src_cell._element.tcPr is not None:
dst_cell._element.tcPr = deepcopy(src_cell._element.tcPr)
# 复制单元格内容
dst_cell.text = "" # 清除默认内容
for src_paragraph in src_cell.paragraphs:
dst_paragraph = dst_cell.add_paragraph()
# 复制段落属性
if src_paragraph._element.pPr is not None:
dst_paragraph._element.pPr = deepcopy(src_paragraph._element.pPr)
# 复制文本和格式
for src_run in src_paragraph.runs:
dst_run = dst_paragraph.add_run(src_run.text)
# 复制运行属性
if src_run._element.rPr is not None:
dst_run._element.rPr = deepcopy(src_run._element.rPr)
except Exception as e:
print(f"警告:复制单元格时出错 [{i},{j}]: {str(e)}")
continue
def _get_vmerge_value(self, cell_element) -> str:
"""
获取单元格的垂直合并属性
Args:
cell_element: 单元格元素
Returns:
str: 垂直合并属性值
"""
vmerge = cell_element.xpath('.//w:vMerge')
if vmerge:
return vmerge[0].get(qn('w:val'), 'continue')
return None
def _get_gridspan_value(self, cell_element) -> int:
"""
获取单元格的水平合并数量
Args:
cell_element: 单元格元素
Returns:
int: 水平合并的列数
"""
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan:
return int(gridspan[0].get(qn('w:val'), '1'))
return 1
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
"""
计算垂直合并的行数
Args:
table: 表格对象
start_row: 起始行
col: 列号
Returns:
int: 垂直合并的行数
"""
span = 1
for i in range(start_row + 1, len(table.rows)):
cell = table.cell(i, col)
if self._get_vmerge_value(cell._element) == 'continue':
span += 1
else:
break
return span
def _extract_table_text(self, table: Table) -> str:
"""
提取表格中的文本内容
Args:
table: docx表格对象
Returns:
str: 表格内容的文本表示
"""
table_text = []
for row in table.rows:
for cell in row.cells:
cell_text = cell.text.strip()
if cell_text:
table_text.append(cell_text)
return ' '.join(table_text)
def process_directory(input_dir: str, output_dir: str = None): def process_directory(input_dir: str, output_dir: str = None):
""" """
@ -334,14 +733,14 @@ def process_directory(input_dir: str, output_dir: str = None):
try: try:
# 清理文档 # 清理文档
main_content, appendix = cleaner.clean_doc(input_path) main_content, appendix, tables = cleaner.clean_doc(input_path)
# 创建输出文件名统一使用docx扩展名 # 创建输出文件名统一使用docx扩展名
base_name = os.path.splitext(file)[0] base_name = os.path.splitext(file)[0]
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx") output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
# 保存为docx格式 # 保存为docx格式
cleaner.save_as_docx(main_content, appendix, output_path) cleaner.save_as_docx(main_content, appendix, tables, output_path)
except Exception as e: except Exception as e:
print(f"处理文件 {file} 时出错: {str(e)}") print(f"处理文件 {file} 时出错: {str(e)}")
@ -351,6 +750,19 @@ def process_directory(input_dir: str, output_dir: str = None):
elif isinstance(e, FileNotFoundError): elif isinstance(e, FileNotFoundError):
print("请确保已安装LibreOffice并将其添加到系统PATH中") print("请确保已安装LibreOffice并将其添加到系统PATH中")
def qn(tag: str) -> str:
"""
将标签转换为带命名空间的格式
Args:
tag: 原始标签
Returns:
str: 带命名空间的标签
"""
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
return prefix + tag
if __name__ == '__main__': if __name__ == '__main__':
import argparse import argparse

View File

@ -1,9 +1,6 @@
python-docx>=0.8.11 python-docx>=0.8.11
regex>=2023.0.0 regex>=2023.0.0
nltk>=3.8.1
scikit-learn>=1.3.0 scikit-learn>=1.3.0
pandas>=2.0.0
numpy>=1.24.0 numpy>=1.24.0
python-magic>=0.4.27 python-magic>=0.4.27
chardet>=5.0.0
requests>=2.31.0 requests>=2.31.0