doc-etl/doc_cleaner.py

775 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import docx
import numpy as np
import requests
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Tuple, Dict, Optional
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.enum.table import WD_TABLE_ALIGNMENT
import subprocess
import tempfile
import json
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from copy import deepcopy
from docx.oxml import parse_xml
from docx.oxml.ns import nsdecls
class DocCleaner:
def __init__(self, ollama_host: str = "http://192.168.1.18:11434"):
"""
初始化文档清理器
Args:
ollama_host: Ollama服务器地址
"""
# 页眉页脚模式
self.header_footer_patterns = [
r'页码\s*\d+-\d+', # 页码格式页码1-1, 页码2-1等
r'\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码第X页共Y页
r'Page\s*\d+\s*of\s*\d+', # 英文页码
]
# 特殊符号模式
self.special_char_patterns = [
r'©\s*\d{4}.*?版权所有', # 版权信息
r'confidential', # 机密标记
r'draft|草稿', # 草稿标记
r'watermark', # 水印标记
]
# 附录和参考文献标题模式
self.appendix_patterns = [
r'^附录\s*[A-Za-z]?[\s:]',
r'^Appendix\s*[A-Za-z]?[\s:]',
r'^参考文献$',
r'^References$',
r'^Bibliography$'
]
# 初始化TF-IDF向量化器
self.vectorizer = TfidfVectorizer(
min_df=1,
stop_words='english'
)
self.ollama_host = ollama_host
self.embedding_model = "bge-m3:latest" # 使用nomic-embed-text模型进行文本嵌入
def _convert_doc_to_docx(self, doc_path: str) -> str:
"""
将doc格式转换为docx格式
Args:
doc_path: doc文件路径
Returns:
str: 转换后的docx文件路径
"""
# 创建临时文件路径
temp_dir = tempfile.mkdtemp()
temp_docx = os.path.join(temp_dir, 'temp.docx')
try:
# 使用sofficeLibreOffice进行转换
cmd = ['soffice', '--headless', '--convert-to', 'docx', '--outdir', temp_dir, doc_path]
subprocess.run(cmd, check=True, capture_output=True)
# 返回转换后的文件路径
return temp_docx
except subprocess.CalledProcessError as e:
raise Exception(f"转换doc文件失败: {str(e)}")
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table]]:
"""
清理文档并返回处理后的正文、附录和表格
Args:
file_path: 文档文件路径
Returns:
Tuple[List[str], List[str], List[Table]]: (清理后的正文段落列表, 附录段落列表, 表格列表)
"""
print(f"\n开始处理文档: {file_path}")
# 检测文件类型
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
# 如果是doc格式先转换为docx
if file_extension == '.doc':
temp_docx = self._convert_doc_to_docx(file_path)
doc = docx.Document(temp_docx)
# 清理临时文件
os.remove(temp_docx)
os.rmdir(os.path.dirname(temp_docx))
else:
doc = docx.Document(file_path)
# 提取所有内容(段落和表格)
content = []
tables = []
table_count = 0
try:
print("\n开始解析文档结构...")
# 遍历文档体中的所有元素
for element in doc._element.body:
if element.tag.endswith('p'):
try:
paragraph = docx.text.paragraph.Paragraph(element, doc)
text = paragraph.text.strip()
# 只添加非空段落
if text:
# 检查是否是附录标题
is_appendix = any(re.match(pattern, text, re.IGNORECASE)
for pattern in self.appendix_patterns)
content.append({
'type': 'paragraph',
'content': text,
'is_appendix_start': is_appendix
})
if is_appendix:
print(f"发现附录标题: {text}")
except Exception as e:
print(f"警告:处理段落时出错: {str(e)}")
continue
elif element.tag.endswith('tbl'):
try:
table = docx.table.Table(element, doc)
# 验证表格是否有效
if hasattr(table, 'rows') and hasattr(table, 'columns'):
tables.append(table)
content.append({
'type': 'table',
'index': table_count
})
print(f"发现表格 {table_count}: {len(table.rows)}行 x {len(table.columns)}")
table_count += 1
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
continue
except Exception as e:
print(f"警告:遍历文档内容时出错: {str(e)}")
print(f"\n文档结构解析完成:")
print(f"- 总元素数: {len(content)}")
print(f"- 表格数量: {len(tables)}")
# 分离正文和附录
main_content = []
appendix = []
is_appendix = False
print("\n开始分离正文和附录...")
for item in content:
if item['type'] == 'paragraph':
if item['is_appendix_start']:
is_appendix = True
print("进入附录部分")
if is_appendix:
appendix.append(item['content'])
else:
main_content.append(item['content'])
elif item['type'] == 'table':
table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}'
if is_appendix:
appendix.append(table_placeholder)
print(f"添加表格到附录: {table_placeholder}")
else:
main_content.append(table_placeholder)
print(f"添加表格到正文: {table_placeholder}")
print(f"\n分离完成:")
print(f"- 正文元素数: {len(main_content)}")
print(f"- 附录元素数: {len(appendix)}")
# 清理正文(保留表格标记)
cleaned_content = []
print("\n开始清理正文...")
for item in main_content:
if item.startswith('TABLE_PLACEHOLDER_'):
cleaned_content.append(item)
print(f"保留表格标记: {item}")
else:
cleaned_text = self._clean_text([item])[0]
if cleaned_text:
cleaned_content.append(cleaned_text)
print(f"\n清理完成:")
print(f"- 清理后元素数: {len(cleaned_content)}")
print("- 表格标记位置:")
for i, item in enumerate(cleaned_content):
if item.startswith('TABLE_PLACEHOLDER_'):
print(f" 位置 {i}: {item}")
return cleaned_content, appendix, tables
def _clean_text(self, text: List[str]) -> List[str]:
"""
清理文本内容
Args:
text: 待清理的文本段落列表
Returns:
List[str]: 清理后的文本段落列表
"""
cleaned = []
for paragraph in text:
# 如果是表格标记,直接保留
if paragraph.startswith('TABLE_PLACEHOLDER_'):
cleaned.append(paragraph)
continue
# 跳过空段落
if not paragraph.strip():
continue
# 检查是否是目录项(包含数字序号的行)
is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph))
if not is_toc_item:
# 移除页眉页脚
for pattern in self.header_footer_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 移除特殊符号
for pattern in self.special_char_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 如果段落不为空,添加到结果中
if paragraph.strip():
cleaned.append(paragraph.strip())
return cleaned
def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]:
"""
分离正文与附录/参考文献
Args:
paragraphs: 文档段落列表
Returns:
Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表)
"""
main_content = []
appendix = []
is_appendix = False
for p in paragraphs:
# 检查是否是附录开始
if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns):
is_appendix = True
if is_appendix:
appendix.append(p)
else:
main_content.append(p)
return main_content, appendix
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
"""
使用Ollama获取文本嵌入向量
Args:
texts: 文本列表
Returns:
np.ndarray: 嵌入向量矩阵
"""
embeddings = []
for text in texts:
try:
response = requests.post(
f"{self.ollama_host}/api/embeddings",
json={
"model": self.embedding_model,
"prompt": text
}
)
response.raise_for_status()
embedding = response.json()["embedding"]
embeddings.append(embedding)
except Exception as e:
print(f"获取文本嵌入失败: {str(e)}")
# 如果获取嵌入失败,使用零向量
embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768
return np.array(embeddings)
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
"""
删除重复段落,保持表格占位符的位置不变
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
try:
# 只对非表格段落进行去重
if text_paragraphs:
# 获取文本嵌入
text_only = [p[1] for p in text_paragraphs]
embeddings = self._get_embeddings(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(embeddings)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
except Exception as e:
print(f"使用Ollama嵌入模型失败回退到TF-IDF方法: {str(e)}")
# 如果使用Ollama失败回退到原来的TF-IDF方法
return self._remove_duplicates_tfidf(paragraphs)
def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]:
"""
使用TF-IDF方法删除重复段落作为备选方案
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
if text_paragraphs:
# 计算TF-IDF矩阵
text_only = [p[1] for p in text_paragraphs]
tfidf_matrix = self.vectorizer.fit_transform(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(tfidf_matrix)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], output_path: str):
"""
将清理后的内容保存为docx格式
Args:
cleaned_content: 清理后的正文段落列表
appendix: 附录段落列表
tables: 表格列表
output_path: 输出文件路径
"""
print(f"\n开始保存文档: {output_path}")
print(f"- 正文元素数: {len(cleaned_content)}")
print(f"- 附录元素数: {len(appendix)}")
print(f"- 表格总数: {len(tables)}")
# 创建新文档
doc = docx.Document()
# 添加正文内容和表格,保持它们的相对位置
print("\n处理正文内容...")
# 创建一个列表来存储所有要插入的元素
elements_to_insert = []
for i, content in enumerate(cleaned_content):
try:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 直接在XML级别复制表格
new_tbl = deepcopy(table._element)
# 确保新表格有正确的命名空间
new_tbl.tbl = parse_xml(new_tbl.xml)
elements_to_insert.append(('table', new_tbl))
print(f"准备插入表格 {table_index} 在位置 {i}")
# 添加表格后的空行
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
except Exception as e:
print(f"警告:复制表格时出错: {str(e)}")
try:
print("尝试使用备用方法...")
p = doc.add_paragraph()
self._copy_table_fallback(p._parent, table)
elements_to_insert.append(('paragraph', p._element))
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
print("备用方法成功")
except Exception as e2:
print(f"警告:备用方法也失败: {str(e2)}")
elements_to_insert.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
else:
# 添加普通段落
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
elements_to_insert.append(('paragraph', p._element))
except Exception as e:
print(f"警告:处理段落或表格时出错: {str(e)}")
continue
# 按顺序将所有元素插入文档
for element_type, element in elements_to_insert:
doc._body._element.append(element)
# 如果有附录,添加分隔符和附录内容
if appendix:
print("\n处理附录内容...")
try:
# 添加分页符
doc.add_page_break()
# 添加附录标题
title = doc.add_paragraph("附录")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加附录内容
appendix_elements = []
for content in appendix:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 直接在XML级别复制表格
new_tbl = deepcopy(table._element)
new_tbl.tbl = parse_xml(new_tbl.xml)
appendix_elements.append(('table', new_tbl))
print(f"准备插入附录表格 {table_index}")
appendix_elements.append(('paragraph', doc.add_paragraph()._element))
except Exception as e:
print(f"警告:复制附录中的表格时出错: {str(e)}")
try:
p = doc.add_paragraph()
self._copy_table_fallback(p._parent, table)
appendix_elements.append(('paragraph', p._element))
appendix_elements.append(('paragraph', doc.add_paragraph()._element))
print("备用方法成功")
except Exception as e2:
print(f"警告:附录表格的备用方法也失败: {str(e2)}")
appendix_elements.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
else:
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
appendix_elements.append(('paragraph', p._element))
# 按顺序将附录元素插入文档
for element_type, element in appendix_elements:
doc._body._element.append(element)
except Exception as e:
print(f"警告:处理附录时出错: {str(e)}")
# 保存文档
try:
doc.save(output_path)
print("\n文档保存成功!")
except Exception as e:
print(f"错误:保存文档时出错: {str(e)}")
raise
def _copy_table_fallback(self, doc: docx.Document, table: Table):
"""
表格复制的备用方法
Args:
doc: 目标文档
table: 源表格
"""
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
# 创建新表格
new_table = doc.add_table(rows=rows, cols=cols)
# 复制表格样式
if table.style:
new_table.style = table.style
# 复制表格属性
new_table._element.tblPr = deepcopy(table._element.tblPr)
# 复制网格信息
new_table._element.tblGrid = deepcopy(table._element.tblGrid)
# 创建单元格映射以跟踪合并
cell_map = {}
# 第一遍:标记合并的单元格
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
# 检查是否是合并单元格的一部分
if src_cell._element.tcPr is not None:
# 检查垂直合并
vmerge = src_cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'restart':
# 这是合并的起始单元格
span = self._get_vertical_span(table, i, j)
cell_map[(i, j)] = ('vmerge', span)
# 检查水平合并
gridspan = src_cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
span = int(gridspan[0].get(qn('w:val')))
if span > 1:
cell_map[(i, j)] = ('hmerge', span)
except Exception as e:
print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}")
# 第二遍:复制内容并执行合并
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
dst_cell = new_table.cell(i, j)
# 检查是否需要合并
if (i, j) in cell_map:
merge_type, span = cell_map[(i, j)]
if merge_type == 'vmerge':
# 垂直合并
for k in range(1, span):
if i + k < rows:
dst_cell.merge(new_table.cell(i + k, j))
elif merge_type == 'hmerge':
# 水平合并
for k in range(1, span):
if j + k < cols:
dst_cell.merge(new_table.cell(i, j + k))
# 复制单元格属性
if src_cell._element.tcPr is not None:
dst_cell._element.tcPr = deepcopy(src_cell._element.tcPr)
# 复制单元格内容
dst_cell.text = "" # 清除默认内容
for src_paragraph in src_cell.paragraphs:
dst_paragraph = dst_cell.add_paragraph()
# 复制段落属性
if src_paragraph._element.pPr is not None:
dst_paragraph._element.pPr = deepcopy(src_paragraph._element.pPr)
# 复制文本和格式
for src_run in src_paragraph.runs:
dst_run = dst_paragraph.add_run(src_run.text)
# 复制运行属性
if src_run._element.rPr is not None:
dst_run._element.rPr = deepcopy(src_run._element.rPr)
except Exception as e:
print(f"警告:复制单元格时出错 [{i},{j}]: {str(e)}")
continue
def _get_vmerge_value(self, cell_element) -> str:
"""
获取单元格的垂直合并属性
Args:
cell_element: 单元格元素
Returns:
str: 垂直合并属性值
"""
vmerge = cell_element.xpath('.//w:vMerge')
if vmerge:
return vmerge[0].get(qn('w:val'), 'continue')
return None
def _get_gridspan_value(self, cell_element) -> int:
"""
获取单元格的水平合并数量
Args:
cell_element: 单元格元素
Returns:
int: 水平合并的列数
"""
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan:
return int(gridspan[0].get(qn('w:val'), '1'))
return 1
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
"""
计算垂直合并的行数
Args:
table: 表格对象
start_row: 起始行
col: 列号
Returns:
int: 垂直合并的行数
"""
span = 1
for i in range(start_row + 1, len(table.rows)):
cell = table.cell(i, col)
if self._get_vmerge_value(cell._element) == 'continue':
span += 1
else:
break
return span
def _extract_table_text(self, table: Table) -> str:
"""
提取表格中的文本内容
Args:
table: docx表格对象
Returns:
str: 表格内容的文本表示
"""
table_text = []
for row in table.rows:
for cell in row.cells:
cell_text = cell.text.strip()
if cell_text:
table_text.append(cell_text)
return ' '.join(table_text)
def process_directory(input_dir: str, output_dir: str = None):
"""
处理指定目录下的所有文档文件
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径如果为None则使用输入目录
"""
# 如果未指定输出目录,使用输入目录
if output_dir is None:
output_dir = input_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cleaner = DocCleaner()
for root, _, files in os.walk(input_dir):
for file in files:
if file.endswith(('.doc', '.docx')):
input_path = os.path.join(root, file)
try:
# 清理文档
main_content, appendix, tables = cleaner.clean_doc(input_path)
# 创建输出文件名统一使用docx扩展名
base_name = os.path.splitext(file)[0]
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
# 保存为docx格式
cleaner.save_as_docx(main_content, appendix, tables, output_path)
except Exception as e:
print(f"处理文件 {file} 时出错: {str(e)}")
# 添加更详细的错误信息
if isinstance(e, subprocess.CalledProcessError):
print(f"命令执行错误: {e.output}")
elif isinstance(e, FileNotFoundError):
print("请确保已安装LibreOffice并将其添加到系统PATH中")
def qn(tag: str) -> str:
"""
将标签转换为带命名空间的格式
Args:
tag: 原始标签
Returns:
str: 带命名空间的标签
"""
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
return prefix + tag
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='文档清理工具')
parser.add_argument('input_dir', help='输入目录路径')
parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None)
args = parser.parse_args()
process_directory(args.input_dir, args.output_dir)