#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import docx import numpy as np import requests from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from typing import List, Tuple, Dict, Optional from docx.shared import Pt from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from docx.enum.table import WD_TABLE_ALIGNMENT import subprocess import tempfile import json from docx.table import Table, _Cell from docx.text.paragraph import Paragraph from copy import deepcopy from docx.oxml import parse_xml from docx.oxml.ns import nsdecls import io class DocCleaner: def __init__(self, ollama_host: str = "http://192.168.1.24:11434"): """ 初始化文档清理器 Args: ollama_host: Ollama服务器地址 """ # 页眉页脚模式 self.header_footer_patterns = [ r'页码\s*\d+-\d+', # 页码格式:页码1-1, 页码2-1等 r'第\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码(第X页共Y页) r'Page\s*\d+\s*of\s*\d+', # 英文页码 ] # 特殊符号模式 self.special_char_patterns = [ r'©\s*\d{4}.*?版权所有', # 版权信息 r'confidential', # 机密标记 r'draft|草稿', # 草稿标记 r'watermark', # 水印标记 ] # 附录和参考文献标题模式 self.appendix_patterns = [ r'^附录\s*[A-Za-z]?[\s::]', r'^Appendix\s*[A-Za-z]?[\s::]', r'^参考文献$', r'^References$', r'^Bibliography$' ] # 初始化TF-IDF向量化器 self.vectorizer = TfidfVectorizer( min_df=1, stop_words='english' ) self.ollama_host = ollama_host self.embedding_model = "bge-m3:latest" # 使用nomic-embed-text模型进行文本嵌入 # 图片相关配置 self.extract_images = True # 是否提取图片 self.image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp'] # 支持的图片扩展名 self.min_image_size = 100 # 最小图片尺寸(宽和高),过滤掉太小的图片 def _convert_doc_to_docx(self, doc_path: str) -> str: """ 将doc格式转换为docx格式 Args: doc_path: doc文件路径 Returns: str: 转换后的docx文件路径 """ # 创建临时文件路径 temp_dir = tempfile.mkdtemp() temp_docx = os.path.join(temp_dir, 'temp.docx') try: # 使用soffice(LibreOffice)进行转换 cmd = ['soffice', '--headless', '--convert-to', 'docx', '--outdir', temp_dir, doc_path] subprocess.run(cmd, check=True, capture_output=True) # 返回转换后的文件路径 return temp_docx except subprocess.CalledProcessError as e: raise Exception(f"转换doc文件失败: {str(e)}") def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table], List[Dict]]: """ 清理文档并返回处理后的正文、附录、表格和图片 Args: file_path: 文档文件路径 Returns: Tuple[List[str], List[str], List[Table], List[Dict]]: (清理后的正文段落列表, 附录段落列表, 表格列表, 图片信息列表) """ print(f"\n开始处理文档: {file_path}") # 检测文件类型 _, file_extension = os.path.splitext(file_path) file_extension = file_extension.lower() # 如果是doc格式,先转换为docx if file_extension == '.doc': temp_docx = self._convert_doc_to_docx(file_path) doc = docx.Document(temp_docx) # 清理临时文件 os.remove(temp_docx) os.rmdir(os.path.dirname(temp_docx)) else: doc = docx.Document(file_path) # 提取图片(如果启用) images = [] if self.extract_images: images = self._extract_document_images(doc) # 提取所有内容(段落和表格) content = [] tables = [] table_count = 0 try: print("\n开始解析文档结构...") # 遍历文档体中的所有元素 for element in doc._element.body: if element.tag.endswith('p'): try: paragraph = docx.text.paragraph.Paragraph(element, doc) text = paragraph.text.strip() # 只添加非空段落 if text: # 检查是否是附录标题 is_appendix = any(re.match(pattern, text, re.IGNORECASE) for pattern in self.appendix_patterns) content.append({ 'type': 'paragraph', 'content': text, 'is_appendix_start': is_appendix }) if is_appendix: print(f"发现附录标题: {text}") except Exception as e: print(f"警告:处理段落时出错: {str(e)}") continue elif element.tag.endswith('tbl'): try: table = docx.table.Table(element, doc) # 验证表格是否有效 if hasattr(table, 'rows') and hasattr(table, 'columns'): tables.append(table) content.append({ 'type': 'table', 'index': table_count }) print(f"发现表格 {table_count}: {len(table.rows)}行 x {len(table.columns)}列") table_count += 1 except Exception as e: print(f"警告:处理表格时出错: {str(e)}") continue except Exception as e: print(f"警告:遍历文档内容时出错: {str(e)}") print(f"\n文档结构解析完成:") print(f"- 总元素数: {len(content)}") print(f"- 表格数量: {len(tables)}") print(f"- 图片数量: {len(images)}") # 分离正文和附录 main_content = [] appendix = [] is_appendix = False print("\n开始分离正文和附录...") for item in content: if item['type'] == 'paragraph': if item['is_appendix_start']: is_appendix = True print("进入附录部分") if is_appendix: appendix.append(item['content']) else: main_content.append(item['content']) elif item['type'] == 'table': table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}' if is_appendix: appendix.append(table_placeholder) print(f"添加表格到附录: {table_placeholder}") else: main_content.append(table_placeholder) print(f"添加表格到正文: {table_placeholder}") print(f"\n分离完成:") print(f"- 正文元素数: {len(main_content)}") print(f"- 附录元素数: {len(appendix)}") # 清理正文(保留表格标记) cleaned_content = [] print("\n开始清理正文...") for item in main_content: if item.startswith('TABLE_PLACEHOLDER_'): cleaned_content.append(item) print(f"保留表格标记: {item}") else: cleaned_text = self._clean_text([item])[0] if cleaned_text: cleaned_content.append(cleaned_text) print(f"\n清理完成:") print(f"- 清理后元素数: {len(cleaned_content)}") print("- 表格标记位置:") for i, item in enumerate(cleaned_content): if item.startswith('TABLE_PLACEHOLDER_'): print(f" 位置 {i}: {item}") return cleaned_content, appendix, tables, images def _clean_text(self, text: List[str]) -> List[str]: """ 清理文本内容 Args: text: 待清理的文本段落列表 Returns: List[str]: 清理后的文本段落列表 """ cleaned = [] for paragraph in text: # 如果是表格标记,直接保留 if paragraph.startswith('TABLE_PLACEHOLDER_'): cleaned.append(paragraph) continue # 跳过空段落 if not paragraph.strip(): continue # 检查是否是目录项(包含数字序号的行) is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph)) if not is_toc_item: # 移除页眉页脚 for pattern in self.header_footer_patterns: paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE) # 移除特殊符号 for pattern in self.special_char_patterns: paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE) # 如果段落不为空,添加到结果中 if paragraph.strip(): cleaned.append(paragraph.strip()) return cleaned def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]: """ 分离正文与附录/参考文献 Args: paragraphs: 文档段落列表 Returns: Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表) """ main_content = [] appendix = [] is_appendix = False for p in paragraphs: # 检查是否是附录开始 if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns): is_appendix = True if is_appendix: appendix.append(p) else: main_content.append(p) return main_content, appendix def _get_embeddings(self, texts: List[str]) -> np.ndarray: """ 使用Ollama获取文本嵌入向量 Args: texts: 文本列表 Returns: np.ndarray: 嵌入向量矩阵 """ embeddings = [] for text in texts: try: response = requests.post( f"{self.ollama_host}/api/embeddings", json={ "model": self.embedding_model, "prompt": text } ) response.raise_for_status() embedding = response.json()["embedding"] embeddings.append(embedding) except Exception as e: print(f"获取文本嵌入失败: {str(e)}") # 如果获取嵌入失败,使用零向量 embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768 return np.array(embeddings) def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]: """ 删除重复段落,保持表格占位符的位置不变 Args: paragraphs: 段落列表 similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值 Returns: List[str]: 去重后的段落列表 """ if not paragraphs: return [] # 分离表格占位符和普通段落 table_placeholders = {} text_paragraphs = [] for i, p in enumerate(paragraphs): if p.startswith('TABLE_PLACEHOLDER_'): table_placeholders[i] = p else: text_paragraphs.append((i, p)) try: # 只对非表格段落进行去重 if text_paragraphs: # 获取文本嵌入 text_only = [p[1] for p in text_paragraphs] embeddings = self._get_embeddings(text_only) # 计算余弦相似度矩阵 similarity_matrix = cosine_similarity(embeddings) # 标记要保留的段落 keep_indices = [] for i in range(len(text_paragraphs)): # 如果当前段落没有与之前的段落高度相似,则保留 if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices): keep_indices.append(i) # 保留的非表格段落 kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices] else: kept_paragraphs = [] # 合并表格占位符和保留的段落,按原始位置排序 all_kept = list(table_placeholders.items()) + kept_paragraphs all_kept.sort(key=lambda x: x[0]) return [p[1] for p in all_kept] except Exception as e: print(f"使用Ollama嵌入模型失败,回退到TF-IDF方法: {str(e)}") # 如果使用Ollama失败,回退到原来的TF-IDF方法 return self._remove_duplicates_tfidf(paragraphs) def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]: """ 使用TF-IDF方法删除重复段落(作为备选方案) Args: paragraphs: 段落列表 similarity_threshold: 相似度阈值 Returns: List[str]: 去重后的段落列表 """ if not paragraphs: return [] # 分离表格占位符和普通段落 table_placeholders = {} text_paragraphs = [] for i, p in enumerate(paragraphs): if p.startswith('TABLE_PLACEHOLDER_'): table_placeholders[i] = p else: text_paragraphs.append((i, p)) if text_paragraphs: # 计算TF-IDF矩阵 text_only = [p[1] for p in text_paragraphs] tfidf_matrix = self.vectorizer.fit_transform(text_only) # 计算余弦相似度矩阵 similarity_matrix = cosine_similarity(tfidf_matrix) # 标记要保留的段落 keep_indices = [] for i in range(len(text_paragraphs)): # 如果当前段落没有与之前的段落高度相似,则保留 if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices): keep_indices.append(i) # 保留的非表格段落 kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices] else: kept_paragraphs = [] # 合并表格占位符和保留的段落,按原始位置排序 all_kept = list(table_placeholders.items()) + kept_paragraphs all_kept.sort(key=lambda x: x[0]) return [p[1] for p in all_kept] def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], images: List[Dict], output_path: str): """ 将清理后的内容保存为docx格式和txt格式 Args: cleaned_content: 清理后的正文段落列表 appendix: 附录段落列表 tables: 表格列表 images: 图片信息列表 output_path: 输出文件路径 """ print(f"\n开始保存文档: {output_path}") print(f"- 正文元素数: {len(cleaned_content)}") print(f"- 附录元素数: {len(appendix)}") print(f"- 表格总数: {len(tables)}") print(f"- 图片总数: {len(images)}") # 创建新文档 doc = docx.Document() # 创建文本输出内容列表(用于保存txt文件) text_output = [] # 构建段落索引到图片索引的映射 paragraph_to_images = {} for img in images: if 'paragraph_index' in img and img['paragraph_index'] >= 0: if img['paragraph_index'] not in paragraph_to_images: paragraph_to_images[img['paragraph_index']] = [] paragraph_to_images[img['paragraph_index']].append(img) # 生成HTML表格文件 html_file_path = os.path.splitext(output_path)[0] + '_tables.html' html_tables = [] # 添加正文内容和表格,保持它们的相对位置 print("\n处理正文内容...") # 使用图片索引和已添加图片跟踪 image_counter = 0 added_images = set() # 创建段落索引到新文档索引的映射 old_to_new_paragraph_map = {} new_paragraph_index = 0 # 遍历清理后的内容 for i, content in enumerate(cleaned_content): try: # 检查是否是表格占位符 table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content) if table_match: table_index = int(table_match.group(1)) print(f"正在处理表格占位符: {content} (索引: {table_index})") if table_index < len(tables): source_table = tables[table_index] try: # 生成表格的HTML标签 html_tags = self._generate_table_html_tags(source_table, f"table_{table_index}") # 添加HTML标签作为普通文本 p = doc.add_paragraph() run = p.add_run(html_tags) run.font.name = 'Courier New' # 使用等宽字体 run.font.size = Pt(10) # 设置字体大小 new_paragraph_index += 1 # 保存HTML到列表,用于生成HTML文件 try: from table.table_to_html import TableToHtml converter = TableToHtml(debug=False) html_code = converter.table_to_html(source_table) html_tables.append(html_code) except Exception as e: print(f"警告:生成HTML表格时出错: {str(e)}") html_tables.append(f"
表格 {table_index + 1} 处理失败: {str(e)}
") # 添加到文本输出 text_output.append(f"表格 {table_index + 1} 开始:") # 使用HTML标签代替表格文本用于txt输出 text_output.append(html_tags) text_output.append(f"表格 {table_index + 1} 结束:") # 添加空行 doc.add_paragraph() new_paragraph_index += 1 except Exception as e: print(f"警告:处理表格时出错: {str(e)}") doc.add_paragraph(f"【表格处理失败: {str(e)}】") text_output.append("【表格处理失败】") new_paragraph_index += 1 else: # 添加普通段落 p = doc.add_paragraph(content) p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY old_to_new_paragraph_map[i] = new_paragraph_index new_paragraph_index += 1 # 添加到文本输出 text_output.append(content) # 检查此段落是否有关联的图片 if i in paragraph_to_images: for img_data in paragraph_to_images[i]: if img_data['index'] not in added_images: try: # 直接从图片数据创建图片 image_stream = io.BytesIO(img_data['data']) # 添加图片到文档 doc.add_picture(image_stream, width=docx.shared.Inches(6)) # 设置宽度为6英寸 new_paragraph_index += 1 # 移除图片标题 # 添加到文本输出 text_output.append(f"[图片]") print(f"在段落 {i} 后插入图片") image_counter += 1 added_images.add(img_data['index']) except Exception as e: print(f"插入图片时出错: {str(e)}") except Exception as e: print(f"警告:处理段落或表格时出错: {str(e)}") continue # 插入未放置的图片 if len(added_images) < len(images): print("\n处理未放置的图片...") # 添加未放置的图片到文档末尾 for img in images: if img['index'] not in added_images: try: # 直接从图片数据创建图片 image_stream = io.BytesIO(img['data']) # 添加图片到文档 doc.add_picture(image_stream, width=docx.shared.Inches(6)) # 设置宽度为6英寸 # 移除图片标题 # 添加到文本输出 text_output.append(f"[图片]") print(f"在文档末尾添加图片") image_counter += 1 added_images.add(img['index']) except Exception as e: print(f"插入图片时出错: {str(e)}") # 如果有附录,添加分隔符和附录内容 if appendix: print("\n处理附录内容...") try: # 添加分页符 doc.add_page_break() # 添加附录标题 title = doc.add_paragraph("附录") title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER # 添加到文本输出 text_output.append("附录") # 添加附录内容 for content in appendix: # 检查是否是表格占位符 table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content) if table_match: table_index = int(table_match.group(1)) print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})") if table_index < len(tables): source_table = tables[table_index] try: # 生成表格的HTML标签 html_tags = self._generate_table_html_tags(source_table, f"table_appendix_{table_index}") # 添加HTML标签作为普通文本 p = doc.add_paragraph() run = p.add_run(html_tags) run.font.name = 'Courier New' # 使用等宽字体 run.font.size = Pt(10) # 设置字体大小 # 保存HTML到列表,用于生成HTML文件 try: from table.table_to_html import TableToHtml converter = TableToHtml(debug=False) html_code = converter.table_to_html(source_table) html_tables.append(html_code) except Exception as e: print(f"警告:生成HTML表格时出错: {str(e)}") html_tables.append(f"
附录表格 {table_index + 1} 处理失败: {str(e)}
") # 添加到文本输出 text_output.append(f"附录表格 {table_index + 1} 开始:") # 使用HTML标签代替表格文本用于txt输出 text_output.append(html_tags) text_output.append(f"附录表格 {table_index + 1} 结束:") except Exception as e: print(f"警告:处理附录表格时出错: {str(e)}") doc.add_paragraph(f"【表格处理失败: {str(e)}】") text_output.append("【表格处理失败】") else: p = doc.add_paragraph(content) p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY # 添加到文本输出 text_output.append(content) except Exception as e: print(f"警告:处理附录时出错: {str(e)}") # 保存HTML表格到文件 if html_tables: try: html_content = f''' 表格预览

文档中的表格

{' '.join(html_tables)} ''' with open(html_file_path, 'w', encoding='utf-8') as f: f.write(html_content) print(f"\nHTML表格文件已保存到: {html_file_path}") # 添加HTML文件引用提示到Word文档 notice = doc.add_paragraph() notice.add_run("表格完整HTML版本可查看文件: ").font.bold = True run = notice.add_run(os.path.basename(html_file_path)) run.font.color.rgb = docx.shared.RGBColor(0, 0, 255) # 蓝色 run.font.underline = True # 下划线 except Exception as e: print(f"警告:保存HTML表格文件时出错: {str(e)}") # 保存docx文档和相关文件 try: # 保存Word文档 doc.save(output_path) print("\nWord文档保存成功!") except Exception as e: print(f"错误:保存Word文档时出错: {str(e)}") import traceback traceback.print_exc() raise # 保存文本文件 try: text_file_path = os.path.splitext(output_path)[0] + '.txt' # 合并文本内容,保留HTML标签 text_content = [] for t in text_output: if t.strip(): # 对于HTML标签内容不做特殊处理,直接添加 if t.startswith(' str: """ 生成表格的HTML标签字符串 Args: table: 源表格 table_id: 表格的唯一ID Returns: str: HTML标签字符串 """ rows = len(table.rows) cols = len(table.columns) if rows == 0 or cols == 0: return "
" # 分析表格结构(查找合并单元格) merged_cells = {} merged_v_cells = set() # 记录被垂直合并的单元格 cell_map = {} # 添加cell_map的定义 # 检测合并单元格 for i in range(rows): for j in range(cols): try: cell = table.cell(i, j) # 检查是否是合并单元格的一部分 if cell._element.tcPr is not None: # 检查垂直合并 vmerge = cell._element.tcPr.xpath('.//w:vMerge') if vmerge: val = vmerge[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue') if val == 'restart': # 这是合并的起始单元格 span = self._get_vertical_span(table, i, j) cell_map[(i, j)] = ('vmerge', span) # 检查水平合并 gridspan = cell._element.tcPr.xpath('.//w:gridSpan') if gridspan: span = int(gridspan[0].get(qn('w:val'))) if span > 1: cell_map[(i, j)] = ('hmerge', span) except Exception as e: print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}") # 构建HTML表格 html = f'\n' html += '\n' # 添加表头行 header_rows = min(1, rows) # 假设第一行是表头 for i in range(header_rows): html += ' \n' j = 0 while j < cols: try: cell = table.cell(i, j) text = cell.text.strip() # 检查是否是合并单元格 rowspan = 1 colspan = 1 if (i, j) in cell_map: merge_type, span = cell_map[(i, j)] if merge_type == 'vmerge': rowspan = span elif merge_type == 'hmerge': colspan = span # 添加表头单元格 attrs = [] if rowspan > 1: attrs.append(f'rowspan="{rowspan}"') if colspan > 1: attrs.append(f'colspan="{colspan}"') attrs_str = ' '.join(attrs) if attrs_str: attrs_str = ' ' + attrs_str html += f' {text}\n' # 如果是水平合并,跳过合并的列 j += colspan except Exception as e: print(f"警告:处理表头单元格时出错 [{i},{j}]: {str(e)}") html += f' \n' j += 1 html += ' \n' html += '\n\n' # 添加数据行 for i in range(header_rows, rows): html += ' \n' j = 0 while j < cols: try: # 跳过已经被垂直合并的单元格 if (i, j) in merged_v_cells: j += 1 continue cell = table.cell(i, j) text = cell.text.strip() # 检查是否是合并单元格 rowspan = 1 colspan = 1 if (i, j) in cell_map: merge_type, span = cell_map[(i, j)] if merge_type == 'vmerge': rowspan = span # 标记被垂直合并的单元格 for k in range(1, span): if i + k < rows: merged_v_cells.add((i + k, j)) elif merge_type == 'hmerge': colspan = span # 添加数据单元格 attrs = [] if rowspan > 1: attrs.append(f'rowspan="{rowspan}"') if colspan > 1: attrs.append(f'colspan="{colspan}"') attrs_str = ' '.join(attrs) if attrs_str: attrs_str = ' ' + attrs_str html += f' {text}\n' # 如果是水平合并,跳过合并的列 j += colspan except Exception as e: print(f"警告:处理数据单元格时出错 [{i},{j}]: {str(e)}") html += f' \n' j += 1 html += ' \n' html += '\n
错误: {str(e)}
错误: {str(e)}
' return html def _get_vmerge_value(self, cell_element) -> str: """ 获取单元格的垂直合并属性 Args: cell_element: 单元格元素 Returns: str: 垂直合并属性值 """ vmerge = cell_element.xpath('.//w:vMerge') if vmerge: return vmerge[0].get(qn('w:val'), 'continue') return None def _get_gridspan_value(self, cell_element) -> int: """ 获取单元格的水平合并数量 Args: cell_element: 单元格元素 Returns: int: 水平合并的列数 """ try: gridspan = cell_element.xpath('.//w:gridSpan') if gridspan and gridspan[0].get(qn('w:val')): return int(gridspan[0].get(qn('w:val'))) except (ValueError, TypeError, AttributeError) as e: print(f"警告:获取gridspan值时出错: {str(e)}") return 1 # 默认返回1,表示没有合并 def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int: """ 计算垂直合并的行数 Args: table: 表格对象 start_row: 起始行 col: 列号 Returns: int: 垂直合并的行数 """ span = 1 for i in range(start_row + 1, len(table.rows)): cell = table.cell(i, col) if self._get_vmerge_value(cell._element) == 'continue': span += 1 else: break return span def _convert_table_to_text(self, table: Table) -> str: """ 将表格转换为文本格式,智能处理简单和复杂表格结构 Args: table: docx表格对象 Returns: str: 表格的文本表示 """ try: # 获取表格的行数和列数 rows = len(table.rows) cols = len(table.columns) print(f"开始处理表格: {rows}行 x {cols}列") if rows == 0 or cols == 0: return "【空表格】" # 存储处理后的表格数据 processed_data = [] # 检查是否是复杂表格(具有合并单元格或多级表头) is_complex_table = False max_header_rows = min(4, rows) # 最多检查前4行,增加检测范围 # 表格类型检测增强 # 1. 检查表格宽高比 - 宽表格通常更复杂 aspect_ratio = cols / rows if rows > 0 else 0 if aspect_ratio > 3 or cols > 6: print("表格检测: 宽表格(列数>6或宽高比>3),标记为复杂表格") is_complex_table = True # 2. 检查前几行是否存在合并单元格 if not is_complex_table: merge_count = 0 for i in range(max_header_rows): for j in range(cols): try: cell = table.cell(i, j) if cell._element.tcPr is not None: # 检查垂直合并 vmerge = cell._element.tcPr.xpath('.//w:vMerge') if vmerge: print(f"表格检测: 发现垂直合并单元格 at [{i},{j}]") merge_count += 1 if merge_count >= 2: # 增加阈值判断 is_complex_table = True break # 检查水平合并 gridspan = cell._element.tcPr.xpath('.//w:gridSpan') if gridspan: span_val = self._get_gridspan_value(cell._element) print(f"表格检测: 发现水平合并单元格 at [{i},{j}], 跨度: {span_val}") if span_val > 1: merge_count += 1 if merge_count >= 2: # 增加阈值判断 is_complex_table = True break except Exception as e: print(f"表格检测: 检查单元格 [{i},{j}] 时出错: {str(e)}") continue if is_complex_table: break # 3. 检查每行的单元格数是否一致 - 不一致通常表示嵌套或特殊结构 if not is_complex_table: cell_counts = [] for i in range(min(5, rows)): # 检查前5行 try: actual_cells = 0 for j in range(cols): cell = table.cell(i, j) # 考虑水平合并 if cell._element.tcPr is not None: gridspan = cell._element.tcPr.xpath('.//w:gridSpan') if gridspan: actual_cells += 1 # 只计算一次,不管跨度 else: actual_cells += 1 else: actual_cells += 1 cell_counts.append(actual_cells) except Exception: continue # 检查单元格数是否一致 if len(cell_counts) > 1 and len(set(cell_counts)) > 1: print(f"表格检测: 各行单元格数不一致 {cell_counts},标记为复杂表格") is_complex_table = True print(f"表格分类: {'复杂表格' if is_complex_table else '简单表格'}") if is_complex_table: # 使用复杂表格处理逻辑 # 第一步:分析表头结构 header_structure = [] # 存储表头的层级结构 header_merge_map = {} # 记录合并单元格的映射关系 # 分析每一列的表头结构 print("开始分析复杂表格表头结构...") for j in range(cols): column_headers = [] last_header = None for i in range(max_header_rows): try: cell = table.cell(i, j) text = cell.text.strip() # 检查垂直合并 if cell._element.tcPr is not None: vmerge = cell._element.tcPr.xpath('.//w:vMerge') if vmerge: val = vmerge[0].get(qn('w:val'), 'continue') if val == 'continue': # 使用上一个非空表头 if last_header: print(f"表头分析: 垂直合并单元格 at [{i},{j}],使用上一行值: {last_header}") text = last_header # 记录合并关系 header_merge_map[(i, j)] = (i-1, j) else: # 向上查找第一个非continue的单元格 for k in range(i-1, -1, -1): try: prev_cell = table.cell(k, j) prev_text = prev_cell.text.strip() if prev_text: text = prev_text print(f"表头分析: 垂直合并单元格 at [{i},{j}],使用上方值 [{k},{j}]: {text}") break except Exception: continue # 检查水平合并 if cell._element.tcPr is not None: gridspan = self._get_gridspan_value(cell._element) if gridspan > 1: # 标记这是一个跨列的表头 print(f"表头分析: 水平合并单元格 at [{i},{j}],跨度 {gridspan},值: {text}") if text: # 只处理有内容的单元格 text = f"SPAN_{gridspan}_{text}" # 记录水平合并影响的列 for k in range(1, gridspan): if j + k < cols: header_merge_map[(i, j+k)] = (i, j) if text: column_headers.append(text) last_header = text except Exception as e: print(f"表头分析: 处理表头单元格 [{i},{j}] 时出错: {str(e)}") continue header_structure.append(column_headers) print(f"列 {j} 的表头结构: {column_headers}") # 第二步:构建完整的表头标识符 full_headers = [] print("开始构建完整表头标识符...") # 处理跨行跨列的表头 # 先进行一次预处理,处理合并单元格 for j, headers in enumerate(header_structure): if not headers: # 检查是否是被合并的列 is_merged = False for i in range(max_header_rows): if (i, j) in header_merge_map: src_i, src_j = header_merge_map[(i, j)] src_cell = table.cell(src_i, src_j) src_text = src_cell.text.strip() if src_text and src_j != j: # 确保是水平合并 print(f"表头补全: 列 {j} 被列 {src_j} 合并,添加表头: {src_text}") header_structure[j].append(src_text) is_merged = True break if not is_merged: print(f"表头补全: 列 {j} 无表头,使用默认值: 列{j+1}") header_structure[j].append(f"列{j+1}") # 构建每列的完整表头 for j, headers in enumerate(header_structure): if not headers: full_headers.append(f"列{j+1}") continue # 处理跨列的表头 header_text = [] current_prefix = "" for h in headers: if h.startswith('SPAN_'): parts = h.split('_', 2) span = int(parts[1]) text = parts[2] # 将跨列的表头添加到后续的列 for k in range(span): if j + k < cols: if k == 0: if text != current_prefix: # 避免重复前缀 header_text.append(text) current_prefix = text else: if text not in header_structure[j + k]: header_structure[j + k].insert(0, text) else: if h != current_prefix: # 避免重复前缀 header_text.append(h) current_prefix = h # 移除重复的表头部分 unique_headers = [] seen = set() for h in header_text: if h not in seen: unique_headers.append(h) seen.add(h) # 构建完整表头,使用特殊分隔符 if unique_headers: full_header = '_'.join(unique_headers) print(f"列 {j} 的完整表头: {full_header}") full_headers.append(full_header) else: full_headers.append(f"列{j+1}") # 确定实际的表头行数 header_row_count = max(len(headers) for headers in header_structure) if header_row_count == 0: header_row_count = 1 print(f"表头行数: {header_row_count}") print(f"开始处理数据行,从第 {header_row_count} 行开始...") # 创建跟踪已处理垂直合并单元格的集合 processed_vmerge = set() # 处理数据行 for i in range(header_row_count, rows): try: row_data = [] j = 0 while j < cols: try: cell = table.cell(i, j) text = cell.text.strip() # 处理垂直合并单元格 if not text and cell._element.tcPr is not None: vmerge = cell._element.tcPr.xpath('.//w:vMerge') if vmerge: val = vmerge[0].get(qn('w:val'), 'continue') if val == 'continue': # 向上查找非continue的值 for k in range(i-1, header_row_count-1, -1): if (k, j) in processed_vmerge: continue try: src_cell = table.cell(k, j) src_text = src_cell.text.strip() if src_text: text = src_text print(f"数据行处理: 垂直合并单元格 at [{i},{j}],使用上方值 [{k},{j}]: {text}") break except Exception: continue processed_vmerge.add((i, j)) # 处理水平合并 gridspan = self._get_gridspan_value(cell._element) # 将值复制到所有合并的列 for k in range(gridspan): if j + k < len(full_headers): # 使用冒号分隔表头和值 if text: row_data.append(f"{full_headers[j+k]}:{text}") else: row_data.append(f"{full_headers[j+k]}:") j += gridspan except Exception as e: print(f"数据行处理: 处理数据单元格 [{i},{j}] 时出错: {str(e)}") if j < len(full_headers): row_data.append(f"{full_headers[j]}:") j += 1 # 确保行中至少有一个非空值 if any(len(data.split(':', 1)) > 1 and data.split(':', 1)[1].strip() for data in row_data): processed_line = " ".join(row_data) print(f"添加处理行 {i}: {processed_line[:100]}..." if len(processed_line) > 100 else f"添加处理行 {i}: {processed_line}") processed_data.append(processed_line) except Exception as e: print(f"数据行处理: 处理数据行 {i} 时出错: {str(e)}") continue else: # 使用简单表格处理逻辑 print("使用简单表格处理逻辑...") # 获取表头 headers = [] for j in range(cols): try: header_text = table.cell(0, j).text.strip() if not header_text: # 如果表头为空,使用默认值 header_text = f"列{j+1}" headers.append(header_text) print(f"简单表格表头 {j}: {header_text}") except Exception as e: print(f"简单表格处理: 处理表头单元格 [0,{j}] 时出错: {str(e)}") headers.append(f"列{j+1}") # 处理数据行 for i in range(1, rows): try: row_data = [] for j in range(cols): try: text = table.cell(i, j).text.strip() row_data.append(f"{headers[j]}:{text}") except Exception as e: print(f"简单表格处理: 处理数据单元格 [{i},{j}] 时出错: {str(e)}") row_data.append(f"{headers[j]}:") # 确保行中至少有一个非空值 if any(len(data.split(':', 1)) > 1 and data.split(':', 1)[1].strip() for data in row_data): processed_line = " ".join(row_data) print(f"添加简单表格行 {i}: {processed_line[:100]}..." if len(processed_line) > 100 else f"添加简单表格行 {i}: {processed_line}") processed_data.append(processed_line) except Exception as e: print(f"简单表格处理: 处理数据行 {i} 时出错: {str(e)}") continue # 返回处理后的表格文本 if processed_data: final_text = " ".join(processed_data) print(f"表格处理完成,生成 {len(processed_data)} 行数据") print(f"表格文本示例: {final_text[:200]}..." if len(final_text) > 200 else f"表格文本: {final_text}") return final_text else: print("表格无有效数据") return "【表格无有效数据】" except Exception as e: print(f"表格处理失败: {str(e)}") import traceback traceback.print_exc() return "【表格处理失败】" def _extract_table_text(self, table: Table) -> str: """ 提取表格中的文本内容,现在会返回格式化的文本表示 Args: table: docx表格对象 Returns: str: 表格内容的文本表示 """ return self._convert_table_to_text(table) def _extract_document_images(self, doc) -> List[Dict]: """ 从文档中提取图片,同时记录图片位置信息 Args: doc: docx文档对象 Returns: List[Dict]: 图片信息列表,包含索引、关系ID、文件名、二进制数据、位置信息等 """ print("\n开始提取文档图片...") images = [] image_index = 0 # 创建段落到索引的映射 paragraph_indices = {} for i, paragraph in enumerate(doc.paragraphs): paragraph_indices[paragraph._p] = i try: # 处理嵌入式图片 (InlineShape) paragraph_with_images = {} for i, paragraph in enumerate(doc.paragraphs): # 检查段落中的所有run for run in paragraph.runs: # 检查run中是否有InlineShape if hasattr(run, '_r') and run._r is not None: for drawing in run._r.findall('.//w:drawing', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): # 找到了图片,记录它的段落位置 if i not in paragraph_with_images: paragraph_with_images[i] = [] paragraph_with_images[i].append(True) # 方法1: 处理InlineShape对象 for i, shape in enumerate(doc.inline_shapes): try: if shape.type == 3: # PICTURE type # 获取图片关系ID rid = shape._inline.graphic.graphicData.pic.blipFill.blip.embed image_part = doc.part.related_parts[rid] image_data = image_part.blob # 找到图片所在的段落 paragraph_index = -1 parent_elem = shape._inline.getparent() while parent_elem is not None: if parent_elem.tag.endswith('p'): if parent_elem in paragraph_indices: paragraph_index = paragraph_indices[parent_elem] break parent_elem = parent_elem.getparent() # 检查图片大小是否合适 if len(image_data) > 100: # 过滤掉太小的图片 # 从内容类型中获取扩展名 content_type = image_part.content_type if 'png' in content_type: image_ext = '.png' elif 'jpeg' in content_type or 'jpg' in content_type: image_ext = '.jpg' elif 'gif' in content_type: image_ext = '.gif' elif 'bmp' in content_type: image_ext = '.bmp' else: image_ext = '.img' if image_ext in self.image_extensions: # 生成唯一的图片文件名 image_filename = f"image_{image_index}{image_ext}" # 检查是否已添加过相同关系ID的图片 duplicate = False for img in images: if img['rel_id'] == rid: duplicate = True break if not duplicate: images.append({ 'index': image_index, 'rel_id': rid, 'filename': image_filename, 'data': image_data, 'paragraph_index': paragraph_index, 'ext': image_ext }) print(f"提取图片 {image_index}: {image_filename} (大小: {len(image_data) // 1024} KB, 段落位置: {paragraph_index})") image_index += 1 except Exception as e: print(f"提取图片时出错(方法1): {str(e)}") # 方法2: 从document.part.rels提取可能遗漏的图片 for rel in doc.part.rels.values(): if "image" in rel.reltype: try: image_data = rel.target_part.blob # 检查图片大小 if len(image_data) > 100: # 过滤掉太小的图片 # 检查是否已添加过相同关系ID的图片 duplicate = False for img in images: if img['rel_id'] == rel.rId: duplicate = True break if not duplicate: image_ext = os.path.splitext(rel.target_ref)[1].lower() if image_ext in self.image_extensions: # 生成唯一的图片文件名 image_filename = f"image_{image_index}{image_ext}" # 尝试找到此图片在文档中的位置 paragraph_index = -1 # 默认位置标记为未知 images.append({ 'index': image_index, 'rel_id': rel.rId, 'filename': image_filename, 'data': image_data, 'paragraph_index': paragraph_index, 'ext': image_ext }) print(f"提取图片 {image_index}: {image_filename} (大小: {len(image_data) // 1024} KB, 位置未知)") image_index += 1 except Exception as e: print(f"提取图片时出错(方法2): {str(e)}") print(f"文档图片提取完成, 共提取 {len(images)} 张图片") except Exception as e: print(f"提取文档图片时出错: {str(e)}") import traceback traceback.print_exc() return images def process_directory(input_dir: str, output_dir: str = None): """ 处理指定目录下的所有文档文件 Args: input_dir: 输入目录路径 output_dir: 输出目录路径,如果为None则使用输入目录 """ # 如果未指定输出目录,使用输入目录 if output_dir is None: output_dir = input_dir if not os.path.exists(output_dir): os.makedirs(output_dir) cleaner = DocCleaner() for root, _, files in os.walk(input_dir): for file in files: if file.endswith(('.doc', '.docx')): input_path = os.path.join(root, file) try: # 清理文档 main_content, appendix, tables, images = cleaner.clean_doc(input_path) # 创建输出文件名(统一使用docx扩展名) base_name = os.path.splitext(file)[0] output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx") # 保存为docx格式 cleaner.save_as_docx(main_content, appendix, tables, images, output_path) except Exception as e: print(f"处理文件 {file} 时出错: {str(e)}") # 添加更详细的错误信息 if isinstance(e, subprocess.CalledProcessError): print(f"命令执行错误: {e.output}") elif isinstance(e, FileNotFoundError): print("请确保已安装LibreOffice并将其添加到系统PATH中") def qn(tag: str) -> str: """ 将标签转换为带命名空间的格式 Args: tag: 原始标签 Returns: str: 带命名空间的标签 """ prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" return prefix + tag if __name__ == '__main__': import argparse # parser = argparse.ArgumentParser(description='文档清理工具') # parser.add_argument('input_dir', help='输入目录路径') # parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None) # # args = parser.parse_args() process_directory("D:\\rzData\\poject\\AI项目\\UDI智能体\\测试文档", "D:\\rzData\\poject\\AI项目\\UDI智能体\\测试文档") # 确保目录存在,如果不存在则创建 # 创建基础目录(使用更安全的方式) # base_dir = 'D:\Desktop\DEMO' # text_dir = os.path.join(base_dir, "测试") # # os.makedirs(text_dir, exist_ok=True, mode=0o777) # # print(f"目录是否存在: {os.path.exists(text_dir)}") # print(f"完整路径: {os.path.abspath(text_dir)}") # 或者直接 print(f"完整路径: {text_dir}")