doc-etl/table/table_cleaner.py
2025-05-21 11:26:55 +08:00

1539 lines
67 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import docx
import numpy as np
import requests
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Tuple, Dict, Optional
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.enum.table import WD_TABLE_ALIGNMENT
import subprocess
import tempfile
import json
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from copy import deepcopy
from docx.oxml import parse_xml
from docx.oxml.ns import nsdecls
import io
class DocCleaner:
def __init__(self, ollama_host: str = "http://192.168.1.24:11434"):
"""
初始化文档清理器
Args:
ollama_host: Ollama服务器地址
"""
# 页眉页脚模式
self.header_footer_patterns = [
r'页码\s*\d+-\d+', # 页码格式页码1-1, 页码2-1等
r'\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码第X页共Y页
r'Page\s*\d+\s*of\s*\d+', # 英文页码
]
# 特殊符号模式
self.special_char_patterns = [
r'©\s*\d{4}.*?版权所有', # 版权信息
r'confidential', # 机密标记
r'draft|草稿', # 草稿标记
r'watermark', # 水印标记
]
# 附录和参考文献标题模式
self.appendix_patterns = [
r'^附录\s*[A-Za-z]?[\s:]',
r'^Appendix\s*[A-Za-z]?[\s:]',
r'^参考文献$',
r'^References$',
r'^Bibliography$'
]
# 初始化TF-IDF向量化器
self.vectorizer = TfidfVectorizer(
min_df=1,
stop_words='english'
)
self.ollama_host = ollama_host
self.embedding_model = "bge-m3:latest" # 使用nomic-embed-text模型进行文本嵌入
# 图片相关配置
self.extract_images = True # 是否提取图片
self.image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp'] # 支持的图片扩展名
self.min_image_size = 100 # 最小图片尺寸(宽和高),过滤掉太小的图片
def _convert_doc_to_docx(self, doc_path: str) -> str:
"""
将doc格式转换为docx格式
Args:
doc_path: doc文件路径
Returns:
str: 转换后的docx文件路径
"""
# 创建临时文件路径
temp_dir = tempfile.mkdtemp()
temp_docx = os.path.join(temp_dir, 'temp.docx')
try:
# 使用sofficeLibreOffice进行转换
cmd = ['soffice', '--headless', '--convert-to', 'docx', '--outdir', temp_dir, doc_path]
subprocess.run(cmd, check=True, capture_output=True)
# 返回转换后的文件路径
return temp_docx
except subprocess.CalledProcessError as e:
raise Exception(f"转换doc文件失败: {str(e)}")
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table], List[Dict]]:
"""
清理文档并返回处理后的正文、附录、表格和图片
Args:
file_path: 文档文件路径
Returns:
Tuple[List[str], List[str], List[Table], List[Dict]]: (清理后的正文段落列表, 附录段落列表, 表格列表, 图片信息列表)
"""
print(f"\n开始处理文档: {file_path}")
# 检测文件类型
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
# 如果是doc格式先转换为docx
if file_extension == '.doc':
temp_docx = self._convert_doc_to_docx(file_path)
doc = docx.Document(temp_docx)
# 清理临时文件
os.remove(temp_docx)
os.rmdir(os.path.dirname(temp_docx))
else:
doc = docx.Document(file_path)
# 提取图片(如果启用)
images = []
if self.extract_images:
images = self._extract_document_images(doc)
# 提取所有内容(段落和表格)
content = []
tables = []
table_count = 0
try:
print("\n开始解析文档结构...")
# 遍历文档体中的所有元素
for element in doc._element.body:
if element.tag.endswith('p'):
try:
paragraph = docx.text.paragraph.Paragraph(element, doc)
text = paragraph.text.strip()
# 只添加非空段落
if text:
# 检查是否是附录标题
is_appendix = any(re.match(pattern, text, re.IGNORECASE)
for pattern in self.appendix_patterns)
content.append({
'type': 'paragraph',
'content': text,
'is_appendix_start': is_appendix
})
if is_appendix:
print(f"发现附录标题: {text}")
except Exception as e:
print(f"警告:处理段落时出错: {str(e)}")
continue
elif element.tag.endswith('tbl'):
try:
table = docx.table.Table(element, doc)
# 验证表格是否有效
if hasattr(table, 'rows') and hasattr(table, 'columns'):
tables.append(table)
content.append({
'type': 'table',
'index': table_count
})
print(f"发现表格 {table_count}: {len(table.rows)}行 x {len(table.columns)}")
table_count += 1
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
continue
except Exception as e:
print(f"警告:遍历文档内容时出错: {str(e)}")
print(f"\n文档结构解析完成:")
print(f"- 总元素数: {len(content)}")
print(f"- 表格数量: {len(tables)}")
print(f"- 图片数量: {len(images)}")
# 分离正文和附录
main_content = []
appendix = []
is_appendix = False
print("\n开始分离正文和附录...")
for item in content:
if item['type'] == 'paragraph':
if item['is_appendix_start']:
is_appendix = True
print("进入附录部分")
if is_appendix:
appendix.append(item['content'])
else:
main_content.append(item['content'])
elif item['type'] == 'table':
table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}'
if is_appendix:
appendix.append(table_placeholder)
print(f"添加表格到附录: {table_placeholder}")
else:
main_content.append(table_placeholder)
print(f"添加表格到正文: {table_placeholder}")
print(f"\n分离完成:")
print(f"- 正文元素数: {len(main_content)}")
print(f"- 附录元素数: {len(appendix)}")
# 清理正文(保留表格标记)
cleaned_content = []
print("\n开始清理正文...")
for item in main_content:
if item.startswith('TABLE_PLACEHOLDER_'):
cleaned_content.append(item)
print(f"保留表格标记: {item}")
else:
cleaned_text = self._clean_text([item])[0]
if cleaned_text:
cleaned_content.append(cleaned_text)
print(f"\n清理完成:")
print(f"- 清理后元素数: {len(cleaned_content)}")
print("- 表格标记位置:")
for i, item in enumerate(cleaned_content):
if item.startswith('TABLE_PLACEHOLDER_'):
print(f" 位置 {i}: {item}")
return cleaned_content, appendix, tables, images
def _clean_text(self, text: List[str]) -> List[str]:
"""
清理文本内容
Args:
text: 待清理的文本段落列表
Returns:
List[str]: 清理后的文本段落列表
"""
cleaned = []
for paragraph in text:
# 如果是表格标记,直接保留
if paragraph.startswith('TABLE_PLACEHOLDER_'):
cleaned.append(paragraph)
continue
# 跳过空段落
if not paragraph.strip():
continue
# 检查是否是目录项(包含数字序号的行)
is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph))
if not is_toc_item:
# 移除页眉页脚
for pattern in self.header_footer_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 移除特殊符号
for pattern in self.special_char_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 如果段落不为空,添加到结果中
if paragraph.strip():
cleaned.append(paragraph.strip())
return cleaned
def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]:
"""
分离正文与附录/参考文献
Args:
paragraphs: 文档段落列表
Returns:
Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表)
"""
main_content = []
appendix = []
is_appendix = False
for p in paragraphs:
# 检查是否是附录开始
if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns):
is_appendix = True
if is_appendix:
appendix.append(p)
else:
main_content.append(p)
return main_content, appendix
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
"""
使用Ollama获取文本嵌入向量
Args:
texts: 文本列表
Returns:
np.ndarray: 嵌入向量矩阵
"""
embeddings = []
for text in texts:
try:
response = requests.post(
f"{self.ollama_host}/api/embeddings",
json={
"model": self.embedding_model,
"prompt": text
}
)
response.raise_for_status()
embedding = response.json()["embedding"]
embeddings.append(embedding)
except Exception as e:
print(f"获取文本嵌入失败: {str(e)}")
# 如果获取嵌入失败,使用零向量
embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768
return np.array(embeddings)
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
"""
删除重复段落,保持表格占位符的位置不变
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
try:
# 只对非表格段落进行去重
if text_paragraphs:
# 获取文本嵌入
text_only = [p[1] for p in text_paragraphs]
embeddings = self._get_embeddings(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(embeddings)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
except Exception as e:
print(f"使用Ollama嵌入模型失败回退到TF-IDF方法: {str(e)}")
# 如果使用Ollama失败回退到原来的TF-IDF方法
return self._remove_duplicates_tfidf(paragraphs)
def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]:
"""
使用TF-IDF方法删除重复段落作为备选方案
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
if text_paragraphs:
# 计算TF-IDF矩阵
text_only = [p[1] for p in text_paragraphs]
tfidf_matrix = self.vectorizer.fit_transform(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(tfidf_matrix)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], images: List[Dict], output_path: str):
"""
将清理后的内容保存为docx格式和txt格式
Args:
cleaned_content: 清理后的正文段落列表
appendix: 附录段落列表
tables: 表格列表
images: 图片信息列表
output_path: 输出文件路径
"""
print(f"\n开始保存文档: {output_path}")
print(f"- 正文元素数: {len(cleaned_content)}")
print(f"- 附录元素数: {len(appendix)}")
print(f"- 表格总数: {len(tables)}")
print(f"- 图片总数: {len(images)}")
# 创建新文档
doc = docx.Document()
# 创建文本输出内容列表用于保存txt文件
text_output = []
# 构建段落索引到图片索引的映射
paragraph_to_images = {}
for img in images:
if 'paragraph_index' in img and img['paragraph_index'] >= 0:
if img['paragraph_index'] not in paragraph_to_images:
paragraph_to_images[img['paragraph_index']] = []
paragraph_to_images[img['paragraph_index']].append(img)
# 生成HTML表格文件
html_file_path = os.path.splitext(output_path)[0] + '_tables.html'
html_tables = []
# 添加正文内容和表格,保持它们的相对位置
print("\n处理正文内容...")
# 使用图片索引和已添加图片跟踪
image_counter = 0
added_images = set()
# 创建段落索引到新文档索引的映射
old_to_new_paragraph_map = {}
new_paragraph_index = 0
# 遍历清理后的内容
for i, content in enumerate(cleaned_content):
try:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
source_table = tables[table_index]
try:
# 生成表格的HTML标签
html_tags = self._generate_table_html_tags(source_table, f"table_{table_index}")
# 添加HTML标签作为普通文本
p = doc.add_paragraph()
run = p.add_run(html_tags)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
new_paragraph_index += 1
# 保存HTML到列表用于生成HTML文件
try:
from table.table_to_html import TableToHtml
converter = TableToHtml(debug=False)
html_code = converter.table_to_html(source_table)
html_tables.append(html_code)
except Exception as e:
print(f"警告生成HTML表格时出错: {str(e)}")
html_tables.append(f"<div class='error'>表格 {table_index + 1} 处理失败: {str(e)}</div>")
# 添加到文本输出
text_output.append(f"表格 {table_index + 1} 开始:")
# 使用HTML标签代替表格文本用于txt输出
text_output.append(html_tags)
text_output.append(f"表格 {table_index + 1} 结束:")
# 添加空行
doc.add_paragraph()
new_paragraph_index += 1
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
doc.add_paragraph(f"【表格处理失败: {str(e)}")
text_output.append("【表格处理失败】")
new_paragraph_index += 1
else:
# 添加普通段落
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
old_to_new_paragraph_map[i] = new_paragraph_index
new_paragraph_index += 1
# 添加到文本输出
text_output.append(content)
# 检查此段落是否有关联的图片
if i in paragraph_to_images:
for img_data in paragraph_to_images[i]:
if img_data['index'] not in added_images:
try:
# 直接从图片数据创建图片
image_stream = io.BytesIO(img_data['data'])
# 添加图片到文档
doc.add_picture(image_stream, width=docx.shared.Inches(6)) # 设置宽度为6英寸
new_paragraph_index += 1
# 移除图片标题
# 添加到文本输出
text_output.append(f"[图片]")
print(f"在段落 {i} 后插入图片")
image_counter += 1
added_images.add(img_data['index'])
except Exception as e:
print(f"插入图片时出错: {str(e)}")
except Exception as e:
print(f"警告:处理段落或表格时出错: {str(e)}")
continue
# 插入未放置的图片
if len(added_images) < len(images):
print("\n处理未放置的图片...")
# 添加未放置的图片到文档末尾
for img in images:
if img['index'] not in added_images:
try:
# 直接从图片数据创建图片
image_stream = io.BytesIO(img['data'])
# 添加图片到文档
doc.add_picture(image_stream, width=docx.shared.Inches(6)) # 设置宽度为6英寸
# 移除图片标题
# 添加到文本输出
text_output.append(f"[图片]")
print(f"在文档末尾添加图片")
image_counter += 1
added_images.add(img['index'])
except Exception as e:
print(f"插入图片时出错: {str(e)}")
# 如果有附录,添加分隔符和附录内容
if appendix:
print("\n处理附录内容...")
try:
# 添加分页符
doc.add_page_break()
# 添加附录标题
title = doc.add_paragraph("附录")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加到文本输出
text_output.append("附录")
# 添加附录内容
for content in appendix:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
source_table = tables[table_index]
try:
# 生成表格的HTML标签
html_tags = self._generate_table_html_tags(source_table, f"table_appendix_{table_index}")
# 添加HTML标签作为普通文本
p = doc.add_paragraph()
run = p.add_run(html_tags)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
# 保存HTML到列表用于生成HTML文件
try:
from table.table_to_html import TableToHtml
converter = TableToHtml(debug=False)
html_code = converter.table_to_html(source_table)
html_tables.append(html_code)
except Exception as e:
print(f"警告生成HTML表格时出错: {str(e)}")
html_tables.append(f"<div class='error'>附录表格 {table_index + 1} 处理失败: {str(e)}</div>")
# 添加到文本输出
text_output.append(f"附录表格 {table_index + 1} 开始:")
# 使用HTML标签代替表格文本用于txt输出
text_output.append(html_tags)
text_output.append(f"附录表格 {table_index + 1} 结束:")
except Exception as e:
print(f"警告:处理附录表格时出错: {str(e)}")
doc.add_paragraph(f"【表格处理失败: {str(e)}")
text_output.append("【表格处理失败】")
else:
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
# 添加到文本输出
text_output.append(content)
except Exception as e:
print(f"警告:处理附录时出错: {str(e)}")
# 保存HTML表格到文件
if html_tables:
try:
html_content = f'''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>表格预览</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
margin: 0;
padding: 20px;
color: #333;
}}
h1 {{
color: #2c3e50;
border-bottom: 2px solid #eee;
padding-bottom: 10px;
}}
.docx-table {{
border-collapse: collapse;
width: 100%;
margin-bottom: 20px;
}}
.docx-table th, .docx-table td {{
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}}
.docx-table th {{
background-color: #f2f2f2;
font-weight: bold;
}}
.docx-table tr:nth-child(even) {{
background-color: #f9f9f9;
}}
.docx-table tr:hover {{
background-color: #f5f5f5;
}}
@media print {{
body {{
padding: 0;
}}
.docx-table {{
page-break-inside: avoid;
}}
}}
</style>
</head>
<body>
<h1>文档中的表格</h1>
{' '.join(html_tables)}
</body>
</html>'''
with open(html_file_path, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"\nHTML表格文件已保存到: {html_file_path}")
# 添加HTML文件引用提示到Word文档
notice = doc.add_paragraph()
notice.add_run("表格完整HTML版本可查看文件: ").font.bold = True
run = notice.add_run(os.path.basename(html_file_path))
run.font.color.rgb = docx.shared.RGBColor(0, 0, 255) # 蓝色
run.font.underline = True # 下划线
except Exception as e:
print(f"警告保存HTML表格文件时出错: {str(e)}")
# 保存docx文档和相关文件
try:
# 保存Word文档
doc.save(output_path)
print("\nWord文档保存成功!")
except Exception as e:
print(f"错误保存Word文档时出错: {str(e)}")
import traceback
traceback.print_exc()
raise
# 保存文本文件
try:
text_file_path = os.path.splitext(output_path)[0] + '.txt'
# 合并文本内容保留HTML标签
text_content = []
for t in text_output:
if t.strip():
# 对于HTML标签内容不做特殊处理直接添加
if t.startswith('<table'):
text_content.append(t)
else:
# 对于普通文本,移除换行符
text_content.append(t.replace('\n', ' ').strip())
# 使用空格连接所有内容
final_text_content = ' '.join(text_content)
with open(text_file_path, 'w', encoding='utf-8') as f:
f.write(final_text_content)
print(f"文本文件保存成功: {text_file_path}")
except Exception as e:
print(f"错误:保存文本文件时出错: {str(e)}")
raise
def _generate_table_html_tags(self, table: Table, table_id: str) -> str:
"""
生成表格的HTML标签字符串
Args:
table: 源表格
table_id: 表格的唯一ID
Returns:
str: HTML标签字符串
"""
rows = len(table.rows)
cols = len(table.columns)
if rows == 0 or cols == 0:
return "<table></table>"
# 分析表格结构(查找合并单元格)
merged_cells = {}
merged_v_cells = set() # 记录被垂直合并的单元格
cell_map = {} # 添加cell_map的定义
# 检测合并单元格
for i in range(rows):
for j in range(cols):
try:
cell = table.cell(i, j)
# 检查是否是合并单元格的一部分
if cell._element.tcPr is not None:
# 检查垂直合并
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue')
if val == 'restart':
# 这是合并的起始单元格
span = self._get_vertical_span(table, i, j)
cell_map[(i, j)] = ('vmerge', span)
# 检查水平合并
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
span = int(gridspan[0].get(qn('w:val')))
if span > 1:
cell_map[(i, j)] = ('hmerge', span)
except Exception as e:
print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}")
# 构建HTML表格
html = f'<table id="{table_id}" class="docx-table">\n'
html += '<thead>\n'
# 添加表头行
header_rows = min(1, rows) # 假设第一行是表头
for i in range(header_rows):
html += ' <tr>\n'
j = 0
while j < cols:
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 检查是否是合并单元格
rowspan = 1
colspan = 1
if (i, j) in cell_map:
merge_type, span = cell_map[(i, j)]
if merge_type == 'vmerge':
rowspan = span
elif merge_type == 'hmerge':
colspan = span
# 添加表头单元格
attrs = []
if rowspan > 1:
attrs.append(f'rowspan="{rowspan}"')
if colspan > 1:
attrs.append(f'colspan="{colspan}"')
attrs_str = ' '.join(attrs)
if attrs_str:
attrs_str = ' ' + attrs_str
html += f' <th{attrs_str}>{text}</th>\n'
# 如果是水平合并,跳过合并的列
j += colspan
except Exception as e:
print(f"警告:处理表头单元格时出错 [{i},{j}]: {str(e)}")
html += f' <th>错误: {str(e)}</th>\n'
j += 1
html += ' </tr>\n'
html += '</thead>\n<tbody>\n'
# 添加数据行
for i in range(header_rows, rows):
html += ' <tr>\n'
j = 0
while j < cols:
try:
# 跳过已经被垂直合并的单元格
if (i, j) in merged_v_cells:
j += 1
continue
cell = table.cell(i, j)
text = cell.text.strip()
# 检查是否是合并单元格
rowspan = 1
colspan = 1
if (i, j) in cell_map:
merge_type, span = cell_map[(i, j)]
if merge_type == 'vmerge':
rowspan = span
# 标记被垂直合并的单元格
for k in range(1, span):
if i + k < rows:
merged_v_cells.add((i + k, j))
elif merge_type == 'hmerge':
colspan = span
# 添加数据单元格
attrs = []
if rowspan > 1:
attrs.append(f'rowspan="{rowspan}"')
if colspan > 1:
attrs.append(f'colspan="{colspan}"')
attrs_str = ' '.join(attrs)
if attrs_str:
attrs_str = ' ' + attrs_str
html += f' <td{attrs_str}>{text}</td>\n'
# 如果是水平合并,跳过合并的列
j += colspan
except Exception as e:
print(f"警告:处理数据单元格时出错 [{i},{j}]: {str(e)}")
html += f' <td>错误: {str(e)}</td>\n'
j += 1
html += ' </tr>\n'
html += '</tbody>\n</table>'
return html
def _get_vmerge_value(self, cell_element) -> str:
"""
获取单元格的垂直合并属性
Args:
cell_element: 单元格元素
Returns:
str: 垂直合并属性值
"""
vmerge = cell_element.xpath('.//w:vMerge')
if vmerge:
return vmerge[0].get(qn('w:val'), 'continue')
return None
def _get_gridspan_value(self, cell_element) -> int:
"""
获取单元格的水平合并数量
Args:
cell_element: 单元格元素
Returns:
int: 水平合并的列数
"""
try:
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan and gridspan[0].get(qn('w:val')):
return int(gridspan[0].get(qn('w:val')))
except (ValueError, TypeError, AttributeError) as e:
print(f"警告获取gridspan值时出错: {str(e)}")
return 1 # 默认返回1表示没有合并
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
"""
计算垂直合并的行数
Args:
table: 表格对象
start_row: 起始行
col: 列号
Returns:
int: 垂直合并的行数
"""
span = 1
for i in range(start_row + 1, len(table.rows)):
cell = table.cell(i, col)
if self._get_vmerge_value(cell._element) == 'continue':
span += 1
else:
break
return span
def _convert_table_to_text(self, table: Table) -> str:
"""
将表格转换为文本格式,智能处理简单和复杂表格结构
Args:
table: docx表格对象
Returns:
str: 表格的文本表示
"""
try:
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
print(f"开始处理表格: {rows}行 x {cols}")
if rows == 0 or cols == 0:
return "【空表格】"
# 存储处理后的表格数据
processed_data = []
# 检查是否是复杂表格(具有合并单元格或多级表头)
is_complex_table = False
max_header_rows = min(4, rows) # 最多检查前4行增加检测范围
# 表格类型检测增强
# 1. 检查表格宽高比 - 宽表格通常更复杂
aspect_ratio = cols / rows if rows > 0 else 0
if aspect_ratio > 3 or cols > 6:
print("表格检测: 宽表格(列数>6或宽高比>3),标记为复杂表格")
is_complex_table = True
# 2. 检查前几行是否存在合并单元格
if not is_complex_table:
merge_count = 0
for i in range(max_header_rows):
for j in range(cols):
try:
cell = table.cell(i, j)
if cell._element.tcPr is not None:
# 检查垂直合并
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
print(f"表格检测: 发现垂直合并单元格 at [{i},{j}]")
merge_count += 1
if merge_count >= 2: # 增加阈值判断
is_complex_table = True
break
# 检查水平合并
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
span_val = self._get_gridspan_value(cell._element)
print(f"表格检测: 发现水平合并单元格 at [{i},{j}], 跨度: {span_val}")
if span_val > 1:
merge_count += 1
if merge_count >= 2: # 增加阈值判断
is_complex_table = True
break
except Exception as e:
print(f"表格检测: 检查单元格 [{i},{j}] 时出错: {str(e)}")
continue
if is_complex_table:
break
# 3. 检查每行的单元格数是否一致 - 不一致通常表示嵌套或特殊结构
if not is_complex_table:
cell_counts = []
for i in range(min(5, rows)): # 检查前5行
try:
actual_cells = 0
for j in range(cols):
cell = table.cell(i, j)
# 考虑水平合并
if cell._element.tcPr is not None:
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
actual_cells += 1 # 只计算一次,不管跨度
else:
actual_cells += 1
else:
actual_cells += 1
cell_counts.append(actual_cells)
except Exception:
continue
# 检查单元格数是否一致
if len(cell_counts) > 1 and len(set(cell_counts)) > 1:
print(f"表格检测: 各行单元格数不一致 {cell_counts},标记为复杂表格")
is_complex_table = True
print(f"表格分类: {'复杂表格' if is_complex_table else '简单表格'}")
if is_complex_table:
# 使用复杂表格处理逻辑
# 第一步:分析表头结构
header_structure = [] # 存储表头的层级结构
header_merge_map = {} # 记录合并单元格的映射关系
# 分析每一列的表头结构
print("开始分析复杂表格表头结构...")
for j in range(cols):
column_headers = []
last_header = None
for i in range(max_header_rows):
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 检查垂直合并
if cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'continue':
# 使用上一个非空表头
if last_header:
print(f"表头分析: 垂直合并单元格 at [{i},{j}],使用上一行值: {last_header}")
text = last_header
# 记录合并关系
header_merge_map[(i, j)] = (i-1, j)
else:
# 向上查找第一个非continue的单元格
for k in range(i-1, -1, -1):
try:
prev_cell = table.cell(k, j)
prev_text = prev_cell.text.strip()
if prev_text:
text = prev_text
print(f"表头分析: 垂直合并单元格 at [{i},{j}],使用上方值 [{k},{j}]: {text}")
break
except Exception:
continue
# 检查水平合并
if cell._element.tcPr is not None:
gridspan = self._get_gridspan_value(cell._element)
if gridspan > 1:
# 标记这是一个跨列的表头
print(f"表头分析: 水平合并单元格 at [{i},{j}],跨度 {gridspan},值: {text}")
if text: # 只处理有内容的单元格
text = f"SPAN_{gridspan}_{text}"
# 记录水平合并影响的列
for k in range(1, gridspan):
if j + k < cols:
header_merge_map[(i, j+k)] = (i, j)
if text:
column_headers.append(text)
last_header = text
except Exception as e:
print(f"表头分析: 处理表头单元格 [{i},{j}] 时出错: {str(e)}")
continue
header_structure.append(column_headers)
print(f"{j} 的表头结构: {column_headers}")
# 第二步:构建完整的表头标识符
full_headers = []
print("开始构建完整表头标识符...")
# 处理跨行跨列的表头
# 先进行一次预处理,处理合并单元格
for j, headers in enumerate(header_structure):
if not headers:
# 检查是否是被合并的列
is_merged = False
for i in range(max_header_rows):
if (i, j) in header_merge_map:
src_i, src_j = header_merge_map[(i, j)]
src_cell = table.cell(src_i, src_j)
src_text = src_cell.text.strip()
if src_text and src_j != j: # 确保是水平合并
print(f"表头补全: 列 {j} 被列 {src_j} 合并,添加表头: {src_text}")
header_structure[j].append(src_text)
is_merged = True
break
if not is_merged:
print(f"表头补全: 列 {j} 无表头,使用默认值: 列{j+1}")
header_structure[j].append(f"{j+1}")
# 构建每列的完整表头
for j, headers in enumerate(header_structure):
if not headers:
full_headers.append(f"{j+1}")
continue
# 处理跨列的表头
header_text = []
current_prefix = ""
for h in headers:
if h.startswith('SPAN_'):
parts = h.split('_', 2)
span = int(parts[1])
text = parts[2]
# 将跨列的表头添加到后续的列
for k in range(span):
if j + k < cols:
if k == 0:
if text != current_prefix: # 避免重复前缀
header_text.append(text)
current_prefix = text
else:
if text not in header_structure[j + k]:
header_structure[j + k].insert(0, text)
else:
if h != current_prefix: # 避免重复前缀
header_text.append(h)
current_prefix = h
# 移除重复的表头部分
unique_headers = []
seen = set()
for h in header_text:
if h not in seen:
unique_headers.append(h)
seen.add(h)
# 构建完整表头,使用特殊分隔符
if unique_headers:
full_header = '_'.join(unique_headers)
print(f"{j} 的完整表头: {full_header}")
full_headers.append(full_header)
else:
full_headers.append(f"{j+1}")
# 确定实际的表头行数
header_row_count = max(len(headers) for headers in header_structure)
if header_row_count == 0:
header_row_count = 1
print(f"表头行数: {header_row_count}")
print(f"开始处理数据行,从第 {header_row_count} 行开始...")
# 创建跟踪已处理垂直合并单元格的集合
processed_vmerge = set()
# 处理数据行
for i in range(header_row_count, rows):
try:
row_data = []
j = 0
while j < cols:
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 处理垂直合并单元格
if not text and cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'continue':
# 向上查找非continue的值
for k in range(i-1, header_row_count-1, -1):
if (k, j) in processed_vmerge:
continue
try:
src_cell = table.cell(k, j)
src_text = src_cell.text.strip()
if src_text:
text = src_text
print(f"数据行处理: 垂直合并单元格 at [{i},{j}],使用上方值 [{k},{j}]: {text}")
break
except Exception:
continue
processed_vmerge.add((i, j))
# 处理水平合并
gridspan = self._get_gridspan_value(cell._element)
# 将值复制到所有合并的列
for k in range(gridspan):
if j + k < len(full_headers):
# 使用冒号分隔表头和值
if text:
row_data.append(f"{full_headers[j+k]}:{text}")
else:
row_data.append(f"{full_headers[j+k]}:")
j += gridspan
except Exception as e:
print(f"数据行处理: 处理数据单元格 [{i},{j}] 时出错: {str(e)}")
if j < len(full_headers):
row_data.append(f"{full_headers[j]}:")
j += 1
# 确保行中至少有一个非空值
if any(len(data.split(':', 1)) > 1 and data.split(':', 1)[1].strip() for data in row_data):
processed_line = " ".join(row_data)
print(f"添加处理行 {i}: {processed_line[:100]}..." if len(processed_line) > 100 else f"添加处理行 {i}: {processed_line}")
processed_data.append(processed_line)
except Exception as e:
print(f"数据行处理: 处理数据行 {i} 时出错: {str(e)}")
continue
else:
# 使用简单表格处理逻辑
print("使用简单表格处理逻辑...")
# 获取表头
headers = []
for j in range(cols):
try:
header_text = table.cell(0, j).text.strip()
if not header_text: # 如果表头为空,使用默认值
header_text = f"{j+1}"
headers.append(header_text)
print(f"简单表格表头 {j}: {header_text}")
except Exception as e:
print(f"简单表格处理: 处理表头单元格 [0,{j}] 时出错: {str(e)}")
headers.append(f"{j+1}")
# 处理数据行
for i in range(1, rows):
try:
row_data = []
for j in range(cols):
try:
text = table.cell(i, j).text.strip()
row_data.append(f"{headers[j]}:{text}")
except Exception as e:
print(f"简单表格处理: 处理数据单元格 [{i},{j}] 时出错: {str(e)}")
row_data.append(f"{headers[j]}:")
# 确保行中至少有一个非空值
if any(len(data.split(':', 1)) > 1 and data.split(':', 1)[1].strip() for data in row_data):
processed_line = " ".join(row_data)
print(f"添加简单表格行 {i}: {processed_line[:100]}..." if len(processed_line) > 100 else f"添加简单表格行 {i}: {processed_line}")
processed_data.append(processed_line)
except Exception as e:
print(f"简单表格处理: 处理数据行 {i} 时出错: {str(e)}")
continue
# 返回处理后的表格文本
if processed_data:
final_text = " ".join(processed_data)
print(f"表格处理完成,生成 {len(processed_data)} 行数据")
print(f"表格文本示例: {final_text[:200]}..." if len(final_text) > 200 else f"表格文本: {final_text}")
return final_text
else:
print("表格无有效数据")
return "【表格无有效数据】"
except Exception as e:
print(f"表格处理失败: {str(e)}")
import traceback
traceback.print_exc()
return "【表格处理失败】"
def _extract_table_text(self, table: Table) -> str:
"""
提取表格中的文本内容,现在会返回格式化的文本表示
Args:
table: docx表格对象
Returns:
str: 表格内容的文本表示
"""
return self._convert_table_to_text(table)
def _extract_document_images(self, doc) -> List[Dict]:
"""
从文档中提取图片,同时记录图片位置信息
Args:
doc: docx文档对象
Returns:
List[Dict]: 图片信息列表包含索引、关系ID、文件名、二进制数据、位置信息等
"""
print("\n开始提取文档图片...")
images = []
image_index = 0
# 创建段落到索引的映射
paragraph_indices = {}
for i, paragraph in enumerate(doc.paragraphs):
paragraph_indices[paragraph._p] = i
try:
# 处理嵌入式图片 (InlineShape)
paragraph_with_images = {}
for i, paragraph in enumerate(doc.paragraphs):
# 检查段落中的所有run
for run in paragraph.runs:
# 检查run中是否有InlineShape
if hasattr(run, '_r') and run._r is not None:
for drawing in run._r.findall('.//w:drawing', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}):
# 找到了图片,记录它的段落位置
if i not in paragraph_with_images:
paragraph_with_images[i] = []
paragraph_with_images[i].append(True)
# 方法1: 处理InlineShape对象
for i, shape in enumerate(doc.inline_shapes):
try:
if shape.type == 3: # PICTURE type
# 获取图片关系ID
rid = shape._inline.graphic.graphicData.pic.blipFill.blip.embed
image_part = doc.part.related_parts[rid]
image_data = image_part.blob
# 找到图片所在的段落
paragraph_index = -1
parent_elem = shape._inline.getparent()
while parent_elem is not None:
if parent_elem.tag.endswith('p'):
if parent_elem in paragraph_indices:
paragraph_index = paragraph_indices[parent_elem]
break
parent_elem = parent_elem.getparent()
# 检查图片大小是否合适
if len(image_data) > 100: # 过滤掉太小的图片
# 从内容类型中获取扩展名
content_type = image_part.content_type
if 'png' in content_type:
image_ext = '.png'
elif 'jpeg' in content_type or 'jpg' in content_type:
image_ext = '.jpg'
elif 'gif' in content_type:
image_ext = '.gif'
elif 'bmp' in content_type:
image_ext = '.bmp'
else:
image_ext = '.img'
if image_ext in self.image_extensions:
# 生成唯一的图片文件名
image_filename = f"image_{image_index}{image_ext}"
# 检查是否已添加过相同关系ID的图片
duplicate = False
for img in images:
if img['rel_id'] == rid:
duplicate = True
break
if not duplicate:
images.append({
'index': image_index,
'rel_id': rid,
'filename': image_filename,
'data': image_data,
'paragraph_index': paragraph_index,
'ext': image_ext
})
print(f"提取图片 {image_index}: {image_filename} (大小: {len(image_data) // 1024} KB, 段落位置: {paragraph_index})")
image_index += 1
except Exception as e:
print(f"提取图片时出错(方法1): {str(e)}")
# 方法2: 从document.part.rels提取可能遗漏的图片
for rel in doc.part.rels.values():
if "image" in rel.reltype:
try:
image_data = rel.target_part.blob
# 检查图片大小
if len(image_data) > 100: # 过滤掉太小的图片
# 检查是否已添加过相同关系ID的图片
duplicate = False
for img in images:
if img['rel_id'] == rel.rId:
duplicate = True
break
if not duplicate:
image_ext = os.path.splitext(rel.target_ref)[1].lower()
if image_ext in self.image_extensions:
# 生成唯一的图片文件名
image_filename = f"image_{image_index}{image_ext}"
# 尝试找到此图片在文档中的位置
paragraph_index = -1 # 默认位置标记为未知
images.append({
'index': image_index,
'rel_id': rel.rId,
'filename': image_filename,
'data': image_data,
'paragraph_index': paragraph_index,
'ext': image_ext
})
print(f"提取图片 {image_index}: {image_filename} (大小: {len(image_data) // 1024} KB, 位置未知)")
image_index += 1
except Exception as e:
print(f"提取图片时出错(方法2): {str(e)}")
print(f"文档图片提取完成, 共提取 {len(images)} 张图片")
except Exception as e:
print(f"提取文档图片时出错: {str(e)}")
import traceback
traceback.print_exc()
return images
def process_directory(input_dir: str, output_dir: str = None):
"""
处理指定目录下的所有文档文件
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径如果为None则使用输入目录
"""
# 如果未指定输出目录,使用输入目录
if output_dir is None:
output_dir = input_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cleaner = DocCleaner()
for root, _, files in os.walk(input_dir):
for file in files:
if file.endswith(('.doc', '.docx')):
input_path = os.path.join(root, file)
try:
# 清理文档
main_content, appendix, tables, images = cleaner.clean_doc(input_path)
# 创建输出文件名统一使用docx扩展名
base_name = os.path.splitext(file)[0]
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
# 保存为docx格式
cleaner.save_as_docx(main_content, appendix, tables, images, output_path)
except Exception as e:
print(f"处理文件 {file} 时出错: {str(e)}")
# 添加更详细的错误信息
if isinstance(e, subprocess.CalledProcessError):
print(f"命令执行错误: {e.output}")
elif isinstance(e, FileNotFoundError):
print("请确保已安装LibreOffice并将其添加到系统PATH中")
def qn(tag: str) -> str:
"""
将标签转换为带命名空间的格式
Args:
tag: 原始标签
Returns:
str: 带命名空间的标签
"""
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
return prefix + tag
if __name__ == '__main__':
import argparse
# parser = argparse.ArgumentParser(description='文档清理工具')
# parser.add_argument('input_dir', help='输入目录路径')
# parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None)
#
# args = parser.parse_args()
process_directory("D:\\rzData\\poject\\AI项目\\UDI智能体\\测试文档", "D:\\rzData\\poject\\AI项目\\UDI智能体\\测试文档")
# 确保目录存在,如果不存在则创建
# 创建基础目录(使用更安全的方式)
# base_dir = 'D:\Desktop\DEMO'
# text_dir = os.path.join(base_dir, "测试")
#
# os.makedirs(text_dir, exist_ok=True, mode=0o777)
#
# print(f"目录是否存在: {os.path.exists(text_dir)}")
# print(f"完整路径: {os.path.abspath(text_dir)}") # 或者直接 print(f"完整路径: {text_dir}")