doc-etl/doc_cleaner.py
2025-04-16 15:28:23 +08:00

381 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import docx
import magic
import numpy as np
import requests
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Tuple, Dict, Optional
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import subprocess
import tempfile
import json
class DocCleaner:
def __init__(self, ollama_host: str = "http://192.168.1.18:11434"):
"""
初始化文档清理器
Args:
ollama_host: Ollama服务器地址
"""
# 页眉页脚模式
self.header_footer_patterns = [
r'\d+-\d+', # 页码格式1-1, 2-1等
r'\s*\d+\s*页', # 中文页码
r'Page\s*\d+\s*of\s*\d+', # 英文页码
]
# 特殊符号模式
self.special_char_patterns = [
r'©\s*\d{4}.*?版权所有', # 版权信息
r'confidential', # 机密标记
r'draft|草稿', # 草稿标记
r'watermark', # 水印标记
]
# 附录和参考文献标题模式
self.appendix_patterns = [
r'^附录\s*[A-Za-z]?[\s:]',
r'^Appendix\s*[A-Za-z]?[\s:]',
r'^参考文献$',
r'^References$',
r'^Bibliography$'
]
# 全角字符到半角字符的映射
self.full_to_half = {
'': ',', '': '.', '': '!', '': '?',
'': ';', '': ':', '': '(', '': ')',
'"': '"', '"': '"', ''': "'", ''': "'",
'': '[', '': ']', '': '<', '': '>',
'': '~', '': '{', '': '}', '': ','
}
# 初始化TF-IDF向量化器
self.vectorizer = TfidfVectorizer(
min_df=1,
stop_words='english'
)
self.ollama_host = ollama_host
self.embedding_model = "bge-m3:latest" # 使用nomic-embed-text模型进行文本嵌入
def _convert_doc_to_docx(self, doc_path: str) -> str:
"""
将doc格式转换为docx格式
Args:
doc_path: doc文件路径
Returns:
str: 转换后的docx文件路径
"""
# 创建临时文件路径
temp_dir = tempfile.mkdtemp()
temp_docx = os.path.join(temp_dir, 'temp.docx')
try:
# 使用sofficeLibreOffice进行转换
cmd = ['soffice', '--headless', '--convert-to', 'docx', '--outdir', temp_dir, doc_path]
subprocess.run(cmd, check=True, capture_output=True)
# 返回转换后的文件路径
return temp_docx
except subprocess.CalledProcessError as e:
raise Exception(f"转换doc文件失败: {str(e)}")
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str]]:
"""
清理文档并返回处理后的正文和附录
Args:
file_path: 文档文件路径
Returns:
Tuple[List[str], List[str]]: (清理后的正文段落列表, 附录段落列表)
"""
# 检测文件类型
file_type = magic.from_file(file_path, mime=True)
# 如果是doc格式先转换为docx
if file_type == 'application/msword':
temp_docx = self._convert_doc_to_docx(file_path)
doc = docx.Document(temp_docx)
# 清理临时文件
os.remove(temp_docx)
os.rmdir(os.path.dirname(temp_docx))
else:
doc = docx.Document(file_path)
# 提取所有段落文本
paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()]
# 分离正文和附录
main_content, appendix = self._split_content(paragraphs)
# 清理正文
cleaned_content = self._clean_text(main_content)
# 删除重复段落
cleaned_content = self._remove_duplicates(cleaned_content)
return cleaned_content, appendix
def _clean_text(self, text: List[str]) -> List[str]:
"""
清理文本内容
Args:
text: 待清理的文本段落列表
Returns:
List[str]: 清理后的文本段落列表
"""
cleaned = []
for paragraph in text:
# 跳过空段落
if not paragraph.strip():
continue
# 移除页眉页脚
for pattern in self.header_footer_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 移除特殊符号
for pattern in self.special_char_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 统一标点符号
paragraph = self._normalize_punctuation(paragraph)
# 如果段落不为空,添加到结果中
if paragraph.strip():
cleaned.append(paragraph.strip())
return cleaned
def _normalize_punctuation(self, text: str) -> str:
"""
统一标点符号(全角转半角)
Args:
text: 输入文本
Returns:
str: 转换后的文本
"""
for full, half in self.full_to_half.items():
text = text.replace(full, half)
return text
def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]:
"""
分离正文与附录/参考文献
Args:
paragraphs: 文档段落列表
Returns:
Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表)
"""
main_content = []
appendix = []
is_appendix = False
for p in paragraphs:
# 检查是否是附录开始
if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns):
is_appendix = True
if is_appendix:
appendix.append(p)
else:
main_content.append(p)
return main_content, appendix
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
"""
使用Ollama获取文本嵌入向量
Args:
texts: 文本列表
Returns:
np.ndarray: 嵌入向量矩阵
"""
embeddings = []
for text in texts:
try:
response = requests.post(
f"{self.ollama_host}/api/embeddings",
json={
"model": self.embedding_model,
"prompt": text
}
)
response.raise_for_status()
embedding = response.json()["embedding"]
embeddings.append(embedding)
except Exception as e:
print(f"获取文本嵌入失败: {str(e)}")
# 如果获取嵌入失败,使用零向量
embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768
return np.array(embeddings)
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
"""
删除重复段落
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
try:
# 获取文本嵌入
embeddings = self._get_embeddings(paragraphs)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(embeddings)
# 标记要保留的段落
keep_indices = []
for i in range(len(paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 返回去重后的段落
return [paragraphs[i] for i in keep_indices]
except Exception as e:
print(f"使用Ollama嵌入模型失败回退到TF-IDF方法: {str(e)}")
# 如果使用Ollama失败回退到原来的TF-IDF方法
return self._remove_duplicates_tfidf(paragraphs)
def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]:
"""
使用TF-IDF方法删除重复段落作为备选方案
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 计算TF-IDF矩阵
tfidf_matrix = self.vectorizer.fit_transform(paragraphs)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(tfidf_matrix)
# 标记要保留的段落
keep_indices = []
for i in range(len(paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 返回去重后的段落
return [paragraphs[i] for i in keep_indices]
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], output_path: str):
"""
将清理后的内容保存为docx格式
Args:
cleaned_content: 清理后的正文段落列表
appendix: 附录段落列表
output_path: 输出文件路径
"""
# 创建新文档
doc = docx.Document()
# 添加正文内容
for paragraph in cleaned_content:
p = doc.add_paragraph(paragraph)
# 设置段落格式(可以根据需要调整)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
# 如果有附录,添加分隔符和附录内容
if appendix:
# 添加分页符
doc.add_page_break()
# 添加附录标题
title = doc.add_paragraph("附录")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加附录内容
for paragraph in appendix:
p = doc.add_paragraph(paragraph)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
# 保存文档
doc.save(output_path)
def process_directory(input_dir: str, output_dir: str):
"""
处理指定目录下的所有文档文件
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cleaner = DocCleaner()
for root, _, files in os.walk(input_dir):
for file in files:
if file.endswith(('.doc', '.docx')):
input_path = os.path.join(root, file)
try:
# 清理文档
main_content, appendix = cleaner.clean_doc(input_path)
# 创建输出文件名统一使用docx扩展名
base_name = os.path.splitext(file)[0]
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
# 保存为docx格式
cleaner.save_as_docx(main_content, appendix, output_path)
except Exception as e:
print(f"处理文件 {file} 时出错: {str(e)}")
# 添加更详细的错误信息
if isinstance(e, subprocess.CalledProcessError):
print(f"命令执行错误: {e.output}")
elif isinstance(e, FileNotFoundError):
print("请确保已安装LibreOffice并将其添加到系统PATH中")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='文档清理工具')
parser.add_argument('input_dir', help='输入目录路径')
parser.add_argument('output_dir', help='输出目录路径')
args = parser.parse_args()
process_directory(args.input_dir, args.output_dir)