#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import docx import magic import numpy as np import requests from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from typing import List, Tuple, Dict, Optional from docx.shared import Pt from docx.enum.text import WD_PARAGRAPH_ALIGNMENT import subprocess import tempfile import json class DocCleaner: def __init__(self, ollama_host: str = "http://192.168.1.18:11434"): """ 初始化文档清理器 Args: ollama_host: Ollama服务器地址 """ # 页眉页脚模式 self.header_footer_patterns = [ r'页码\s*\d+-\d+', # 页码格式:页码1-1, 页码2-1等 r'第\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码(第X页共Y页) r'Page\s*\d+\s*of\s*\d+', # 英文页码 ] # 特殊符号模式 self.special_char_patterns = [ r'©\s*\d{4}.*?版权所有', # 版权信息 r'confidential', # 机密标记 r'draft|草稿', # 草稿标记 r'watermark', # 水印标记 ] # 附录和参考文献标题模式 self.appendix_patterns = [ r'^附录\s*[A-Za-z]?[\s::]', r'^Appendix\s*[A-Za-z]?[\s::]', r'^参考文献$', r'^References$', r'^Bibliography$' ] # 初始化TF-IDF向量化器 self.vectorizer = TfidfVectorizer( min_df=1, stop_words='english' ) self.ollama_host = ollama_host self.embedding_model = "bge-m3:latest" # 使用nomic-embed-text模型进行文本嵌入 def _convert_doc_to_docx(self, doc_path: str) -> str: """ 将doc格式转换为docx格式 Args: doc_path: doc文件路径 Returns: str: 转换后的docx文件路径 """ # 创建临时文件路径 temp_dir = tempfile.mkdtemp() temp_docx = os.path.join(temp_dir, 'temp.docx') try: # 使用soffice(LibreOffice)进行转换 cmd = ['soffice', '--headless', '--convert-to', 'docx', '--outdir', temp_dir, doc_path] subprocess.run(cmd, check=True, capture_output=True) # 返回转换后的文件路径 return temp_docx except subprocess.CalledProcessError as e: raise Exception(f"转换doc文件失败: {str(e)}") def clean_doc(self, file_path: str) -> Tuple[List[str], List[str]]: """ 清理文档并返回处理后的正文和附录 Args: file_path: 文档文件路径 Returns: Tuple[List[str], List[str]]: (清理后的正文段落列表, 附录段落列表) """ # 检测文件类型 file_type = magic.from_file(file_path, mime=True) # 如果是doc格式,先转换为docx if file_type == 'application/msword': temp_docx = self._convert_doc_to_docx(file_path) doc = docx.Document(temp_docx) # 清理临时文件 os.remove(temp_docx) os.rmdir(os.path.dirname(temp_docx)) else: doc = docx.Document(file_path) # 提取所有段落文本 paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()] # 分离正文和附录 main_content, appendix = self._split_content(paragraphs) # 清理正文 cleaned_content = self._clean_text(main_content) # 删除重复段落 #cleaned_content = self._remove_duplicates(cleaned_content) return cleaned_content, appendix def _clean_text(self, text: List[str]) -> List[str]: """ 清理文本内容 Args: text: 待清理的文本段落列表 Returns: List[str]: 清理后的文本段落列表 """ cleaned = [] for paragraph in text: # 跳过空段落 if not paragraph.strip(): continue # 检查是否是目录项(包含数字序号的行) is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph)) if not is_toc_item: # 移除页眉页脚 for pattern in self.header_footer_patterns: paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE) # 移除特殊符号 for pattern in self.special_char_patterns: paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE) # 如果段落不为空,添加到结果中 if paragraph.strip(): cleaned.append(paragraph.strip()) return cleaned def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]: """ 分离正文与附录/参考文献 Args: paragraphs: 文档段落列表 Returns: Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表) """ main_content = [] appendix = [] is_appendix = False for p in paragraphs: # 检查是否是附录开始 if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns): is_appendix = True if is_appendix: appendix.append(p) else: main_content.append(p) return main_content, appendix def _get_embeddings(self, texts: List[str]) -> np.ndarray: """ 使用Ollama获取文本嵌入向量 Args: texts: 文本列表 Returns: np.ndarray: 嵌入向量矩阵 """ embeddings = [] for text in texts: try: response = requests.post( f"{self.ollama_host}/api/embeddings", json={ "model": self.embedding_model, "prompt": text } ) response.raise_for_status() embedding = response.json()["embedding"] embeddings.append(embedding) except Exception as e: print(f"获取文本嵌入失败: {str(e)}") # 如果获取嵌入失败,使用零向量 embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768 return np.array(embeddings) def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]: """ 删除重复段落 Args: paragraphs: 段落列表 similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值 Returns: List[str]: 去重后的段落列表 """ if not paragraphs: return [] try: # 获取文本嵌入 embeddings = self._get_embeddings(paragraphs) # 计算余弦相似度矩阵 similarity_matrix = cosine_similarity(embeddings) # 标记要保留的段落 keep_indices = [] for i in range(len(paragraphs)): # 如果当前段落没有与之前的段落高度相似,则保留 if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices): keep_indices.append(i) # 返回去重后的段落 return [paragraphs[i] for i in keep_indices] except Exception as e: print(f"使用Ollama嵌入模型失败,回退到TF-IDF方法: {str(e)}") # 如果使用Ollama失败,回退到原来的TF-IDF方法 return self._remove_duplicates_tfidf(paragraphs) def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]: """ 使用TF-IDF方法删除重复段落(作为备选方案) Args: paragraphs: 段落列表 similarity_threshold: 相似度阈值 Returns: List[str]: 去重后的段落列表 """ if not paragraphs: return [] # 计算TF-IDF矩阵 tfidf_matrix = self.vectorizer.fit_transform(paragraphs) # 计算余弦相似度矩阵 similarity_matrix = cosine_similarity(tfidf_matrix) # 标记要保留的段落 keep_indices = [] for i in range(len(paragraphs)): # 如果当前段落没有与之前的段落高度相似,则保留 if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices): keep_indices.append(i) # 返回去重后的段落 return [paragraphs[i] for i in keep_indices] def save_as_docx(self, cleaned_content: List[str], appendix: List[str], output_path: str): """ 将清理后的内容保存为docx格式 Args: cleaned_content: 清理后的正文段落列表 appendix: 附录段落列表 output_path: 输出文件路径 """ # 创建新文档 doc = docx.Document() # 添加正文内容 for paragraph in cleaned_content: p = doc.add_paragraph(paragraph) # 设置段落格式(可以根据需要调整) p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY # 如果有附录,添加分隔符和附录内容 if appendix: # 添加分页符 doc.add_page_break() # 添加附录标题 title = doc.add_paragraph("附录") title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER # 添加附录内容 for paragraph in appendix: p = doc.add_paragraph(paragraph) p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY # 保存文档 doc.save(output_path) def process_directory(input_dir: str, output_dir: str = None): """ 处理指定目录下的所有文档文件 Args: input_dir: 输入目录路径 output_dir: 输出目录路径,如果为None则使用输入目录 """ # 如果未指定输出目录,使用输入目录 if output_dir is None: output_dir = input_dir if not os.path.exists(output_dir): os.makedirs(output_dir) cleaner = DocCleaner() for root, _, files in os.walk(input_dir): for file in files: if file.endswith(('.doc', '.docx')): input_path = os.path.join(root, file) try: # 清理文档 main_content, appendix = cleaner.clean_doc(input_path) # 创建输出文件名(统一使用docx扩展名) base_name = os.path.splitext(file)[0] output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx") # 保存为docx格式 cleaner.save_as_docx(main_content, appendix, output_path) except Exception as e: print(f"处理文件 {file} 时出错: {str(e)}") # 添加更详细的错误信息 if isinstance(e, subprocess.CalledProcessError): print(f"命令执行错误: {e.output}") elif isinstance(e, FileNotFoundError): print("请确保已安装LibreOffice并将其添加到系统PATH中") if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='文档清理工具') parser.add_argument('input_dir', help='输入目录路径') parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None) args = parser.parse_args() process_directory(args.input_dir, args.output_dir)