1539 lines
67 KiB
Python
1539 lines
67 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import re
|
||
import docx
|
||
import numpy as np
|
||
import requests
|
||
from sklearn.feature_extraction.text import TfidfVectorizer
|
||
from sklearn.metrics.pairwise import cosine_similarity
|
||
from typing import List, Tuple, Dict, Optional
|
||
from docx.shared import Pt
|
||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
|
||
from docx.enum.table import WD_TABLE_ALIGNMENT
|
||
import subprocess
|
||
import tempfile
|
||
import json
|
||
from docx.table import Table, _Cell
|
||
from docx.text.paragraph import Paragraph
|
||
from copy import deepcopy
|
||
from docx.oxml import parse_xml
|
||
from docx.oxml.ns import nsdecls
|
||
import io
|
||
|
||
class DocCleaner:
|
||
def __init__(self, ollama_host: str = "http://192.168.1.24:11434"):
|
||
"""
|
||
初始化文档清理器
|
||
|
||
Args:
|
||
ollama_host: Ollama服务器地址
|
||
"""
|
||
# 页眉页脚模式
|
||
self.header_footer_patterns = [
|
||
r'页码\s*\d+-\d+', # 页码格式:页码1-1, 页码2-1等
|
||
r'第\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码(第X页共Y页)
|
||
r'Page\s*\d+\s*of\s*\d+', # 英文页码
|
||
]
|
||
|
||
# 特殊符号模式
|
||
self.special_char_patterns = [
|
||
r'©\s*\d{4}.*?版权所有', # 版权信息
|
||
r'confidential', # 机密标记
|
||
r'draft|草稿', # 草稿标记
|
||
r'watermark', # 水印标记
|
||
]
|
||
|
||
# 附录和参考文献标题模式
|
||
self.appendix_patterns = [
|
||
r'^附录\s*[A-Za-z]?[\s::]',
|
||
r'^Appendix\s*[A-Za-z]?[\s::]',
|
||
r'^参考文献$',
|
||
r'^References$',
|
||
r'^Bibliography$'
|
||
]
|
||
|
||
# 初始化TF-IDF向量化器
|
||
self.vectorizer = TfidfVectorizer(
|
||
min_df=1,
|
||
stop_words='english'
|
||
)
|
||
|
||
self.ollama_host = ollama_host
|
||
self.embedding_model = "bge-m3:latest" # 使用nomic-embed-text模型进行文本嵌入
|
||
|
||
# 图片相关配置
|
||
self.extract_images = True # 是否提取图片
|
||
self.image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp'] # 支持的图片扩展名
|
||
self.min_image_size = 100 # 最小图片尺寸(宽和高),过滤掉太小的图片
|
||
|
||
def _convert_doc_to_docx(self, doc_path: str) -> str:
|
||
"""
|
||
将doc格式转换为docx格式
|
||
|
||
Args:
|
||
doc_path: doc文件路径
|
||
|
||
Returns:
|
||
str: 转换后的docx文件路径
|
||
"""
|
||
# 创建临时文件路径
|
||
temp_dir = tempfile.mkdtemp()
|
||
temp_docx = os.path.join(temp_dir, 'temp.docx')
|
||
|
||
try:
|
||
# 使用soffice(LibreOffice)进行转换
|
||
cmd = ['soffice', '--headless', '--convert-to', 'docx', '--outdir', temp_dir, doc_path]
|
||
subprocess.run(cmd, check=True, capture_output=True)
|
||
|
||
# 返回转换后的文件路径
|
||
return temp_docx
|
||
except subprocess.CalledProcessError as e:
|
||
raise Exception(f"转换doc文件失败: {str(e)}")
|
||
|
||
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table], List[Dict]]:
|
||
"""
|
||
清理文档并返回处理后的正文、附录、表格和图片
|
||
|
||
Args:
|
||
file_path: 文档文件路径
|
||
|
||
Returns:
|
||
Tuple[List[str], List[str], List[Table], List[Dict]]: (清理后的正文段落列表, 附录段落列表, 表格列表, 图片信息列表)
|
||
"""
|
||
print(f"\n开始处理文档: {file_path}")
|
||
|
||
# 检测文件类型
|
||
_, file_extension = os.path.splitext(file_path)
|
||
file_extension = file_extension.lower()
|
||
|
||
# 如果是doc格式,先转换为docx
|
||
if file_extension == '.doc':
|
||
temp_docx = self._convert_doc_to_docx(file_path)
|
||
doc = docx.Document(temp_docx)
|
||
# 清理临时文件
|
||
os.remove(temp_docx)
|
||
os.rmdir(os.path.dirname(temp_docx))
|
||
else:
|
||
doc = docx.Document(file_path)
|
||
|
||
# 提取图片(如果启用)
|
||
images = []
|
||
if self.extract_images:
|
||
images = self._extract_document_images(doc)
|
||
|
||
# 提取所有内容(段落和表格)
|
||
content = []
|
||
tables = []
|
||
table_count = 0
|
||
|
||
try:
|
||
print("\n开始解析文档结构...")
|
||
# 遍历文档体中的所有元素
|
||
for element in doc._element.body:
|
||
if element.tag.endswith('p'):
|
||
try:
|
||
paragraph = docx.text.paragraph.Paragraph(element, doc)
|
||
text = paragraph.text.strip()
|
||
|
||
# 只添加非空段落
|
||
if text:
|
||
# 检查是否是附录标题
|
||
is_appendix = any(re.match(pattern, text, re.IGNORECASE)
|
||
for pattern in self.appendix_patterns)
|
||
content.append({
|
||
'type': 'paragraph',
|
||
'content': text,
|
||
'is_appendix_start': is_appendix
|
||
})
|
||
if is_appendix:
|
||
print(f"发现附录标题: {text}")
|
||
except Exception as e:
|
||
print(f"警告:处理段落时出错: {str(e)}")
|
||
continue
|
||
|
||
elif element.tag.endswith('tbl'):
|
||
try:
|
||
table = docx.table.Table(element, doc)
|
||
# 验证表格是否有效
|
||
if hasattr(table, 'rows') and hasattr(table, 'columns'):
|
||
tables.append(table)
|
||
content.append({
|
||
'type': 'table',
|
||
'index': table_count
|
||
})
|
||
print(f"发现表格 {table_count}: {len(table.rows)}行 x {len(table.columns)}列")
|
||
table_count += 1
|
||
except Exception as e:
|
||
print(f"警告:处理表格时出错: {str(e)}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
print(f"警告:遍历文档内容时出错: {str(e)}")
|
||
|
||
print(f"\n文档结构解析完成:")
|
||
print(f"- 总元素数: {len(content)}")
|
||
print(f"- 表格数量: {len(tables)}")
|
||
print(f"- 图片数量: {len(images)}")
|
||
|
||
# 分离正文和附录
|
||
main_content = []
|
||
appendix = []
|
||
is_appendix = False
|
||
|
||
print("\n开始分离正文和附录...")
|
||
for item in content:
|
||
if item['type'] == 'paragraph':
|
||
if item['is_appendix_start']:
|
||
is_appendix = True
|
||
print("进入附录部分")
|
||
|
||
if is_appendix:
|
||
appendix.append(item['content'])
|
||
else:
|
||
main_content.append(item['content'])
|
||
|
||
elif item['type'] == 'table':
|
||
table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}'
|
||
if is_appendix:
|
||
appendix.append(table_placeholder)
|
||
print(f"添加表格到附录: {table_placeholder}")
|
||
else:
|
||
main_content.append(table_placeholder)
|
||
print(f"添加表格到正文: {table_placeholder}")
|
||
|
||
print(f"\n分离完成:")
|
||
print(f"- 正文元素数: {len(main_content)}")
|
||
print(f"- 附录元素数: {len(appendix)}")
|
||
|
||
# 清理正文(保留表格标记)
|
||
cleaned_content = []
|
||
print("\n开始清理正文...")
|
||
for item in main_content:
|
||
if item.startswith('TABLE_PLACEHOLDER_'):
|
||
cleaned_content.append(item)
|
||
print(f"保留表格标记: {item}")
|
||
else:
|
||
cleaned_text = self._clean_text([item])[0]
|
||
if cleaned_text:
|
||
cleaned_content.append(cleaned_text)
|
||
|
||
print(f"\n清理完成:")
|
||
print(f"- 清理后元素数: {len(cleaned_content)}")
|
||
print("- 表格标记位置:")
|
||
for i, item in enumerate(cleaned_content):
|
||
if item.startswith('TABLE_PLACEHOLDER_'):
|
||
print(f" 位置 {i}: {item}")
|
||
|
||
return cleaned_content, appendix, tables, images
|
||
|
||
def _clean_text(self, text: List[str]) -> List[str]:
|
||
"""
|
||
清理文本内容
|
||
|
||
Args:
|
||
text: 待清理的文本段落列表
|
||
|
||
Returns:
|
||
List[str]: 清理后的文本段落列表
|
||
"""
|
||
cleaned = []
|
||
for paragraph in text:
|
||
# 如果是表格标记,直接保留
|
||
if paragraph.startswith('TABLE_PLACEHOLDER_'):
|
||
cleaned.append(paragraph)
|
||
continue
|
||
|
||
# 跳过空段落
|
||
if not paragraph.strip():
|
||
continue
|
||
|
||
# 检查是否是目录项(包含数字序号的行)
|
||
is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph))
|
||
|
||
if not is_toc_item:
|
||
# 移除页眉页脚
|
||
for pattern in self.header_footer_patterns:
|
||
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
|
||
|
||
# 移除特殊符号
|
||
for pattern in self.special_char_patterns:
|
||
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
|
||
|
||
# 如果段落不为空,添加到结果中
|
||
if paragraph.strip():
|
||
cleaned.append(paragraph.strip())
|
||
|
||
return cleaned
|
||
|
||
def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]:
|
||
"""
|
||
分离正文与附录/参考文献
|
||
|
||
Args:
|
||
paragraphs: 文档段落列表
|
||
|
||
Returns:
|
||
Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表)
|
||
"""
|
||
main_content = []
|
||
appendix = []
|
||
is_appendix = False
|
||
|
||
for p in paragraphs:
|
||
# 检查是否是附录开始
|
||
if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns):
|
||
is_appendix = True
|
||
|
||
if is_appendix:
|
||
appendix.append(p)
|
||
else:
|
||
main_content.append(p)
|
||
|
||
return main_content, appendix
|
||
|
||
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
|
||
"""
|
||
使用Ollama获取文本嵌入向量
|
||
|
||
Args:
|
||
texts: 文本列表
|
||
|
||
Returns:
|
||
np.ndarray: 嵌入向量矩阵
|
||
"""
|
||
embeddings = []
|
||
|
||
for text in texts:
|
||
try:
|
||
response = requests.post(
|
||
f"{self.ollama_host}/api/embeddings",
|
||
json={
|
||
"model": self.embedding_model,
|
||
"prompt": text
|
||
}
|
||
)
|
||
response.raise_for_status()
|
||
embedding = response.json()["embedding"]
|
||
embeddings.append(embedding)
|
||
except Exception as e:
|
||
print(f"获取文本嵌入失败: {str(e)}")
|
||
# 如果获取嵌入失败,使用零向量
|
||
embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768
|
||
|
||
return np.array(embeddings)
|
||
|
||
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
|
||
"""
|
||
删除重复段落,保持表格占位符的位置不变
|
||
|
||
Args:
|
||
paragraphs: 段落列表
|
||
similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值
|
||
|
||
Returns:
|
||
List[str]: 去重后的段落列表
|
||
"""
|
||
if not paragraphs:
|
||
return []
|
||
|
||
# 分离表格占位符和普通段落
|
||
table_placeholders = {}
|
||
text_paragraphs = []
|
||
for i, p in enumerate(paragraphs):
|
||
if p.startswith('TABLE_PLACEHOLDER_'):
|
||
table_placeholders[i] = p
|
||
else:
|
||
text_paragraphs.append((i, p))
|
||
|
||
try:
|
||
# 只对非表格段落进行去重
|
||
if text_paragraphs:
|
||
# 获取文本嵌入
|
||
text_only = [p[1] for p in text_paragraphs]
|
||
embeddings = self._get_embeddings(text_only)
|
||
|
||
# 计算余弦相似度矩阵
|
||
similarity_matrix = cosine_similarity(embeddings)
|
||
|
||
# 标记要保留的段落
|
||
keep_indices = []
|
||
for i in range(len(text_paragraphs)):
|
||
# 如果当前段落没有与之前的段落高度相似,则保留
|
||
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
|
||
keep_indices.append(i)
|
||
|
||
# 保留的非表格段落
|
||
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
|
||
else:
|
||
kept_paragraphs = []
|
||
|
||
# 合并表格占位符和保留的段落,按原始位置排序
|
||
all_kept = list(table_placeholders.items()) + kept_paragraphs
|
||
all_kept.sort(key=lambda x: x[0])
|
||
|
||
return [p[1] for p in all_kept]
|
||
|
||
except Exception as e:
|
||
print(f"使用Ollama嵌入模型失败,回退到TF-IDF方法: {str(e)}")
|
||
# 如果使用Ollama失败,回退到原来的TF-IDF方法
|
||
return self._remove_duplicates_tfidf(paragraphs)
|
||
|
||
def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]:
|
||
"""
|
||
使用TF-IDF方法删除重复段落(作为备选方案)
|
||
|
||
Args:
|
||
paragraphs: 段落列表
|
||
similarity_threshold: 相似度阈值
|
||
|
||
Returns:
|
||
List[str]: 去重后的段落列表
|
||
"""
|
||
if not paragraphs:
|
||
return []
|
||
|
||
# 分离表格占位符和普通段落
|
||
table_placeholders = {}
|
||
text_paragraphs = []
|
||
for i, p in enumerate(paragraphs):
|
||
if p.startswith('TABLE_PLACEHOLDER_'):
|
||
table_placeholders[i] = p
|
||
else:
|
||
text_paragraphs.append((i, p))
|
||
|
||
if text_paragraphs:
|
||
# 计算TF-IDF矩阵
|
||
text_only = [p[1] for p in text_paragraphs]
|
||
tfidf_matrix = self.vectorizer.fit_transform(text_only)
|
||
|
||
# 计算余弦相似度矩阵
|
||
similarity_matrix = cosine_similarity(tfidf_matrix)
|
||
|
||
# 标记要保留的段落
|
||
keep_indices = []
|
||
for i in range(len(text_paragraphs)):
|
||
# 如果当前段落没有与之前的段落高度相似,则保留
|
||
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
|
||
keep_indices.append(i)
|
||
|
||
# 保留的非表格段落
|
||
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
|
||
else:
|
||
kept_paragraphs = []
|
||
|
||
# 合并表格占位符和保留的段落,按原始位置排序
|
||
all_kept = list(table_placeholders.items()) + kept_paragraphs
|
||
all_kept.sort(key=lambda x: x[0])
|
||
|
||
return [p[1] for p in all_kept]
|
||
|
||
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], images: List[Dict], output_path: str):
|
||
"""
|
||
将清理后的内容保存为docx格式和txt格式
|
||
|
||
Args:
|
||
cleaned_content: 清理后的正文段落列表
|
||
appendix: 附录段落列表
|
||
tables: 表格列表
|
||
images: 图片信息列表
|
||
output_path: 输出文件路径
|
||
"""
|
||
print(f"\n开始保存文档: {output_path}")
|
||
print(f"- 正文元素数: {len(cleaned_content)}")
|
||
print(f"- 附录元素数: {len(appendix)}")
|
||
print(f"- 表格总数: {len(tables)}")
|
||
print(f"- 图片总数: {len(images)}")
|
||
|
||
# 创建新文档
|
||
doc = docx.Document()
|
||
|
||
# 创建文本输出内容列表(用于保存txt文件)
|
||
text_output = []
|
||
|
||
# 构建段落索引到图片索引的映射
|
||
paragraph_to_images = {}
|
||
for img in images:
|
||
if 'paragraph_index' in img and img['paragraph_index'] >= 0:
|
||
if img['paragraph_index'] not in paragraph_to_images:
|
||
paragraph_to_images[img['paragraph_index']] = []
|
||
paragraph_to_images[img['paragraph_index']].append(img)
|
||
|
||
# 生成HTML表格文件
|
||
html_file_path = os.path.splitext(output_path)[0] + '_tables.html'
|
||
html_tables = []
|
||
|
||
# 添加正文内容和表格,保持它们的相对位置
|
||
print("\n处理正文内容...")
|
||
|
||
# 使用图片索引和已添加图片跟踪
|
||
image_counter = 0
|
||
added_images = set()
|
||
|
||
# 创建段落索引到新文档索引的映射
|
||
old_to_new_paragraph_map = {}
|
||
new_paragraph_index = 0
|
||
|
||
# 遍历清理后的内容
|
||
for i, content in enumerate(cleaned_content):
|
||
try:
|
||
# 检查是否是表格占位符
|
||
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
|
||
if table_match:
|
||
table_index = int(table_match.group(1))
|
||
print(f"正在处理表格占位符: {content} (索引: {table_index})")
|
||
if table_index < len(tables):
|
||
source_table = tables[table_index]
|
||
try:
|
||
# 生成表格的HTML标签
|
||
html_tags = self._generate_table_html_tags(source_table, f"table_{table_index}")
|
||
|
||
# 添加HTML标签作为普通文本
|
||
p = doc.add_paragraph()
|
||
run = p.add_run(html_tags)
|
||
run.font.name = 'Courier New' # 使用等宽字体
|
||
run.font.size = Pt(10) # 设置字体大小
|
||
new_paragraph_index += 1
|
||
|
||
# 保存HTML到列表,用于生成HTML文件
|
||
try:
|
||
from table.table_to_html import TableToHtml
|
||
converter = TableToHtml(debug=False)
|
||
html_code = converter.table_to_html(source_table)
|
||
html_tables.append(html_code)
|
||
except Exception as e:
|
||
print(f"警告:生成HTML表格时出错: {str(e)}")
|
||
html_tables.append(f"<div class='error'>表格 {table_index + 1} 处理失败: {str(e)}</div>")
|
||
|
||
# 添加到文本输出
|
||
text_output.append(f"表格 {table_index + 1} 开始:")
|
||
|
||
# 使用HTML标签代替表格文本用于txt输出
|
||
text_output.append(html_tags)
|
||
text_output.append(f"表格 {table_index + 1} 结束:")
|
||
|
||
# 添加空行
|
||
doc.add_paragraph()
|
||
new_paragraph_index += 1
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理表格时出错: {str(e)}")
|
||
doc.add_paragraph(f"【表格处理失败: {str(e)}】")
|
||
text_output.append("【表格处理失败】")
|
||
new_paragraph_index += 1
|
||
else:
|
||
# 添加普通段落
|
||
p = doc.add_paragraph(content)
|
||
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
|
||
old_to_new_paragraph_map[i] = new_paragraph_index
|
||
new_paragraph_index += 1
|
||
|
||
# 添加到文本输出
|
||
text_output.append(content)
|
||
|
||
# 检查此段落是否有关联的图片
|
||
if i in paragraph_to_images:
|
||
for img_data in paragraph_to_images[i]:
|
||
if img_data['index'] not in added_images:
|
||
try:
|
||
# 直接从图片数据创建图片
|
||
image_stream = io.BytesIO(img_data['data'])
|
||
|
||
# 添加图片到文档
|
||
doc.add_picture(image_stream, width=docx.shared.Inches(6)) # 设置宽度为6英寸
|
||
new_paragraph_index += 1
|
||
|
||
# 移除图片标题
|
||
# 添加到文本输出
|
||
text_output.append(f"[图片]")
|
||
|
||
print(f"在段落 {i} 后插入图片")
|
||
image_counter += 1
|
||
added_images.add(img_data['index'])
|
||
except Exception as e:
|
||
print(f"插入图片时出错: {str(e)}")
|
||
except Exception as e:
|
||
print(f"警告:处理段落或表格时出错: {str(e)}")
|
||
continue
|
||
|
||
# 插入未放置的图片
|
||
if len(added_images) < len(images):
|
||
print("\n处理未放置的图片...")
|
||
|
||
# 添加未放置的图片到文档末尾
|
||
for img in images:
|
||
if img['index'] not in added_images:
|
||
try:
|
||
# 直接从图片数据创建图片
|
||
image_stream = io.BytesIO(img['data'])
|
||
|
||
# 添加图片到文档
|
||
doc.add_picture(image_stream, width=docx.shared.Inches(6)) # 设置宽度为6英寸
|
||
|
||
# 移除图片标题
|
||
# 添加到文本输出
|
||
text_output.append(f"[图片]")
|
||
|
||
print(f"在文档末尾添加图片")
|
||
image_counter += 1
|
||
added_images.add(img['index'])
|
||
except Exception as e:
|
||
print(f"插入图片时出错: {str(e)}")
|
||
|
||
# 如果有附录,添加分隔符和附录内容
|
||
if appendix:
|
||
print("\n处理附录内容...")
|
||
try:
|
||
# 添加分页符
|
||
doc.add_page_break()
|
||
|
||
# 添加附录标题
|
||
title = doc.add_paragraph("附录")
|
||
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
|
||
|
||
# 添加到文本输出
|
||
text_output.append("附录")
|
||
|
||
# 添加附录内容
|
||
for content in appendix:
|
||
# 检查是否是表格占位符
|
||
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
|
||
if table_match:
|
||
table_index = int(table_match.group(1))
|
||
print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})")
|
||
if table_index < len(tables):
|
||
source_table = tables[table_index]
|
||
try:
|
||
# 生成表格的HTML标签
|
||
html_tags = self._generate_table_html_tags(source_table, f"table_appendix_{table_index}")
|
||
|
||
# 添加HTML标签作为普通文本
|
||
p = doc.add_paragraph()
|
||
run = p.add_run(html_tags)
|
||
run.font.name = 'Courier New' # 使用等宽字体
|
||
run.font.size = Pt(10) # 设置字体大小
|
||
|
||
# 保存HTML到列表,用于生成HTML文件
|
||
try:
|
||
from table.table_to_html import TableToHtml
|
||
converter = TableToHtml(debug=False)
|
||
html_code = converter.table_to_html(source_table)
|
||
html_tables.append(html_code)
|
||
except Exception as e:
|
||
print(f"警告:生成HTML表格时出错: {str(e)}")
|
||
html_tables.append(f"<div class='error'>附录表格 {table_index + 1} 处理失败: {str(e)}</div>")
|
||
|
||
# 添加到文本输出
|
||
text_output.append(f"附录表格 {table_index + 1} 开始:")
|
||
|
||
# 使用HTML标签代替表格文本用于txt输出
|
||
text_output.append(html_tags)
|
||
text_output.append(f"附录表格 {table_index + 1} 结束:")
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理附录表格时出错: {str(e)}")
|
||
doc.add_paragraph(f"【表格处理失败: {str(e)}】")
|
||
text_output.append("【表格处理失败】")
|
||
else:
|
||
p = doc.add_paragraph(content)
|
||
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
|
||
# 添加到文本输出
|
||
text_output.append(content)
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理附录时出错: {str(e)}")
|
||
|
||
# 保存HTML表格到文件
|
||
if html_tables:
|
||
try:
|
||
html_content = f'''<!DOCTYPE html>
|
||
<html>
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<title>表格预览</title>
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<style>
|
||
body {{
|
||
font-family: Arial, sans-serif;
|
||
line-height: 1.6;
|
||
margin: 0;
|
||
padding: 20px;
|
||
color: #333;
|
||
}}
|
||
h1 {{
|
||
color: #2c3e50;
|
||
border-bottom: 2px solid #eee;
|
||
padding-bottom: 10px;
|
||
}}
|
||
.docx-table {{
|
||
border-collapse: collapse;
|
||
width: 100%;
|
||
margin-bottom: 20px;
|
||
}}
|
||
.docx-table th, .docx-table td {{
|
||
border: 1px solid #ddd;
|
||
padding: 8px;
|
||
text-align: left;
|
||
}}
|
||
.docx-table th {{
|
||
background-color: #f2f2f2;
|
||
font-weight: bold;
|
||
}}
|
||
.docx-table tr:nth-child(even) {{
|
||
background-color: #f9f9f9;
|
||
}}
|
||
.docx-table tr:hover {{
|
||
background-color: #f5f5f5;
|
||
}}
|
||
@media print {{
|
||
body {{
|
||
padding: 0;
|
||
}}
|
||
.docx-table {{
|
||
page-break-inside: avoid;
|
||
}}
|
||
}}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<h1>文档中的表格</h1>
|
||
{' '.join(html_tables)}
|
||
</body>
|
||
</html>'''
|
||
|
||
with open(html_file_path, 'w', encoding='utf-8') as f:
|
||
f.write(html_content)
|
||
print(f"\nHTML表格文件已保存到: {html_file_path}")
|
||
|
||
# 添加HTML文件引用提示到Word文档
|
||
notice = doc.add_paragraph()
|
||
notice.add_run("表格完整HTML版本可查看文件: ").font.bold = True
|
||
run = notice.add_run(os.path.basename(html_file_path))
|
||
run.font.color.rgb = docx.shared.RGBColor(0, 0, 255) # 蓝色
|
||
run.font.underline = True # 下划线
|
||
|
||
except Exception as e:
|
||
print(f"警告:保存HTML表格文件时出错: {str(e)}")
|
||
|
||
# 保存docx文档和相关文件
|
||
try:
|
||
# 保存Word文档
|
||
doc.save(output_path)
|
||
print("\nWord文档保存成功!")
|
||
|
||
except Exception as e:
|
||
print(f"错误:保存Word文档时出错: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
raise
|
||
|
||
# 保存文本文件
|
||
try:
|
||
text_file_path = os.path.splitext(output_path)[0] + '.txt'
|
||
# 合并文本内容,保留HTML标签
|
||
text_content = []
|
||
for t in text_output:
|
||
if t.strip():
|
||
# 对于HTML标签内容不做特殊处理,直接添加
|
||
if t.startswith('<table'):
|
||
text_content.append(t)
|
||
else:
|
||
# 对于普通文本,移除换行符
|
||
text_content.append(t.replace('\n', ' ').strip())
|
||
|
||
# 使用空格连接所有内容
|
||
final_text_content = ' '.join(text_content)
|
||
|
||
with open(text_file_path, 'w', encoding='utf-8') as f:
|
||
f.write(final_text_content)
|
||
print(f"文本文件保存成功: {text_file_path}")
|
||
except Exception as e:
|
||
print(f"错误:保存文本文件时出错: {str(e)}")
|
||
raise
|
||
|
||
def _generate_table_html_tags(self, table: Table, table_id: str) -> str:
|
||
"""
|
||
生成表格的HTML标签字符串
|
||
|
||
Args:
|
||
table: 源表格
|
||
table_id: 表格的唯一ID
|
||
|
||
Returns:
|
||
str: HTML标签字符串
|
||
"""
|
||
rows = len(table.rows)
|
||
cols = len(table.columns)
|
||
|
||
if rows == 0 or cols == 0:
|
||
return "<table></table>"
|
||
|
||
# 分析表格结构(查找合并单元格)
|
||
merged_cells = {}
|
||
merged_v_cells = set() # 记录被垂直合并的单元格
|
||
cell_map = {} # 添加cell_map的定义
|
||
|
||
# 检测合并单元格
|
||
for i in range(rows):
|
||
for j in range(cols):
|
||
try:
|
||
cell = table.cell(i, j)
|
||
|
||
# 检查是否是合并单元格的一部分
|
||
if cell._element.tcPr is not None:
|
||
# 检查垂直合并
|
||
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
val = vmerge[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue')
|
||
if val == 'restart':
|
||
# 这是合并的起始单元格
|
||
span = self._get_vertical_span(table, i, j)
|
||
cell_map[(i, j)] = ('vmerge', span)
|
||
|
||
# 检查水平合并
|
||
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
|
||
if gridspan:
|
||
span = int(gridspan[0].get(qn('w:val')))
|
||
if span > 1:
|
||
cell_map[(i, j)] = ('hmerge', span)
|
||
except Exception as e:
|
||
print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}")
|
||
|
||
# 构建HTML表格
|
||
html = f'<table id="{table_id}" class="docx-table">\n'
|
||
html += '<thead>\n'
|
||
|
||
# 添加表头行
|
||
header_rows = min(1, rows) # 假设第一行是表头
|
||
for i in range(header_rows):
|
||
html += ' <tr>\n'
|
||
j = 0
|
||
while j < cols:
|
||
try:
|
||
cell = table.cell(i, j)
|
||
text = cell.text.strip()
|
||
|
||
# 检查是否是合并单元格
|
||
rowspan = 1
|
||
colspan = 1
|
||
|
||
if (i, j) in cell_map:
|
||
merge_type, span = cell_map[(i, j)]
|
||
if merge_type == 'vmerge':
|
||
rowspan = span
|
||
elif merge_type == 'hmerge':
|
||
colspan = span
|
||
|
||
# 添加表头单元格
|
||
attrs = []
|
||
if rowspan > 1:
|
||
attrs.append(f'rowspan="{rowspan}"')
|
||
if colspan > 1:
|
||
attrs.append(f'colspan="{colspan}"')
|
||
|
||
attrs_str = ' '.join(attrs)
|
||
if attrs_str:
|
||
attrs_str = ' ' + attrs_str
|
||
|
||
html += f' <th{attrs_str}>{text}</th>\n'
|
||
|
||
# 如果是水平合并,跳过合并的列
|
||
j += colspan
|
||
except Exception as e:
|
||
print(f"警告:处理表头单元格时出错 [{i},{j}]: {str(e)}")
|
||
html += f' <th>错误: {str(e)}</th>\n'
|
||
j += 1
|
||
html += ' </tr>\n'
|
||
|
||
html += '</thead>\n<tbody>\n'
|
||
|
||
# 添加数据行
|
||
for i in range(header_rows, rows):
|
||
html += ' <tr>\n'
|
||
j = 0
|
||
while j < cols:
|
||
try:
|
||
# 跳过已经被垂直合并的单元格
|
||
if (i, j) in merged_v_cells:
|
||
j += 1
|
||
continue
|
||
|
||
cell = table.cell(i, j)
|
||
text = cell.text.strip()
|
||
|
||
# 检查是否是合并单元格
|
||
rowspan = 1
|
||
colspan = 1
|
||
|
||
if (i, j) in cell_map:
|
||
merge_type, span = cell_map[(i, j)]
|
||
if merge_type == 'vmerge':
|
||
rowspan = span
|
||
# 标记被垂直合并的单元格
|
||
for k in range(1, span):
|
||
if i + k < rows:
|
||
merged_v_cells.add((i + k, j))
|
||
elif merge_type == 'hmerge':
|
||
colspan = span
|
||
|
||
# 添加数据单元格
|
||
attrs = []
|
||
if rowspan > 1:
|
||
attrs.append(f'rowspan="{rowspan}"')
|
||
if colspan > 1:
|
||
attrs.append(f'colspan="{colspan}"')
|
||
|
||
attrs_str = ' '.join(attrs)
|
||
if attrs_str:
|
||
attrs_str = ' ' + attrs_str
|
||
|
||
html += f' <td{attrs_str}>{text}</td>\n'
|
||
|
||
# 如果是水平合并,跳过合并的列
|
||
j += colspan
|
||
except Exception as e:
|
||
print(f"警告:处理数据单元格时出错 [{i},{j}]: {str(e)}")
|
||
html += f' <td>错误: {str(e)}</td>\n'
|
||
j += 1
|
||
html += ' </tr>\n'
|
||
|
||
html += '</tbody>\n</table>'
|
||
return html
|
||
|
||
def _get_vmerge_value(self, cell_element) -> str:
|
||
"""
|
||
获取单元格的垂直合并属性
|
||
|
||
Args:
|
||
cell_element: 单元格元素
|
||
|
||
Returns:
|
||
str: 垂直合并属性值
|
||
"""
|
||
vmerge = cell_element.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
return vmerge[0].get(qn('w:val'), 'continue')
|
||
return None
|
||
|
||
def _get_gridspan_value(self, cell_element) -> int:
|
||
"""
|
||
获取单元格的水平合并数量
|
||
|
||
Args:
|
||
cell_element: 单元格元素
|
||
|
||
Returns:
|
||
int: 水平合并的列数
|
||
"""
|
||
try:
|
||
gridspan = cell_element.xpath('.//w:gridSpan')
|
||
if gridspan and gridspan[0].get(qn('w:val')):
|
||
return int(gridspan[0].get(qn('w:val')))
|
||
except (ValueError, TypeError, AttributeError) as e:
|
||
print(f"警告:获取gridspan值时出错: {str(e)}")
|
||
return 1 # 默认返回1,表示没有合并
|
||
|
||
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
|
||
"""
|
||
计算垂直合并的行数
|
||
|
||
Args:
|
||
table: 表格对象
|
||
start_row: 起始行
|
||
col: 列号
|
||
|
||
Returns:
|
||
int: 垂直合并的行数
|
||
"""
|
||
span = 1
|
||
for i in range(start_row + 1, len(table.rows)):
|
||
cell = table.cell(i, col)
|
||
if self._get_vmerge_value(cell._element) == 'continue':
|
||
span += 1
|
||
else:
|
||
break
|
||
return span
|
||
|
||
def _convert_table_to_text(self, table: Table) -> str:
|
||
"""
|
||
将表格转换为文本格式,智能处理简单和复杂表格结构
|
||
|
||
Args:
|
||
table: docx表格对象
|
||
|
||
Returns:
|
||
str: 表格的文本表示
|
||
"""
|
||
try:
|
||
# 获取表格的行数和列数
|
||
rows = len(table.rows)
|
||
cols = len(table.columns)
|
||
|
||
print(f"开始处理表格: {rows}行 x {cols}列")
|
||
|
||
if rows == 0 or cols == 0:
|
||
return "【空表格】"
|
||
|
||
# 存储处理后的表格数据
|
||
processed_data = []
|
||
|
||
# 检查是否是复杂表格(具有合并单元格或多级表头)
|
||
is_complex_table = False
|
||
max_header_rows = min(4, rows) # 最多检查前4行,增加检测范围
|
||
|
||
# 表格类型检测增强
|
||
# 1. 检查表格宽高比 - 宽表格通常更复杂
|
||
aspect_ratio = cols / rows if rows > 0 else 0
|
||
if aspect_ratio > 3 or cols > 6:
|
||
print("表格检测: 宽表格(列数>6或宽高比>3),标记为复杂表格")
|
||
is_complex_table = True
|
||
|
||
# 2. 检查前几行是否存在合并单元格
|
||
if not is_complex_table:
|
||
merge_count = 0
|
||
for i in range(max_header_rows):
|
||
for j in range(cols):
|
||
try:
|
||
cell = table.cell(i, j)
|
||
if cell._element.tcPr is not None:
|
||
# 检查垂直合并
|
||
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
print(f"表格检测: 发现垂直合并单元格 at [{i},{j}]")
|
||
merge_count += 1
|
||
if merge_count >= 2: # 增加阈值判断
|
||
is_complex_table = True
|
||
break
|
||
# 检查水平合并
|
||
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
|
||
if gridspan:
|
||
span_val = self._get_gridspan_value(cell._element)
|
||
print(f"表格检测: 发现水平合并单元格 at [{i},{j}], 跨度: {span_val}")
|
||
if span_val > 1:
|
||
merge_count += 1
|
||
if merge_count >= 2: # 增加阈值判断
|
||
is_complex_table = True
|
||
break
|
||
except Exception as e:
|
||
print(f"表格检测: 检查单元格 [{i},{j}] 时出错: {str(e)}")
|
||
continue
|
||
if is_complex_table:
|
||
break
|
||
|
||
# 3. 检查每行的单元格数是否一致 - 不一致通常表示嵌套或特殊结构
|
||
if not is_complex_table:
|
||
cell_counts = []
|
||
for i in range(min(5, rows)): # 检查前5行
|
||
try:
|
||
actual_cells = 0
|
||
for j in range(cols):
|
||
cell = table.cell(i, j)
|
||
# 考虑水平合并
|
||
if cell._element.tcPr is not None:
|
||
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
|
||
if gridspan:
|
||
actual_cells += 1 # 只计算一次,不管跨度
|
||
else:
|
||
actual_cells += 1
|
||
else:
|
||
actual_cells += 1
|
||
cell_counts.append(actual_cells)
|
||
except Exception:
|
||
continue
|
||
|
||
# 检查单元格数是否一致
|
||
if len(cell_counts) > 1 and len(set(cell_counts)) > 1:
|
||
print(f"表格检测: 各行单元格数不一致 {cell_counts},标记为复杂表格")
|
||
is_complex_table = True
|
||
|
||
print(f"表格分类: {'复杂表格' if is_complex_table else '简单表格'}")
|
||
|
||
if is_complex_table:
|
||
# 使用复杂表格处理逻辑
|
||
# 第一步:分析表头结构
|
||
header_structure = [] # 存储表头的层级结构
|
||
header_merge_map = {} # 记录合并单元格的映射关系
|
||
|
||
# 分析每一列的表头结构
|
||
print("开始分析复杂表格表头结构...")
|
||
for j in range(cols):
|
||
column_headers = []
|
||
last_header = None
|
||
for i in range(max_header_rows):
|
||
try:
|
||
cell = table.cell(i, j)
|
||
text = cell.text.strip()
|
||
|
||
# 检查垂直合并
|
||
if cell._element.tcPr is not None:
|
||
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
val = vmerge[0].get(qn('w:val'), 'continue')
|
||
if val == 'continue':
|
||
# 使用上一个非空表头
|
||
if last_header:
|
||
print(f"表头分析: 垂直合并单元格 at [{i},{j}],使用上一行值: {last_header}")
|
||
text = last_header
|
||
# 记录合并关系
|
||
header_merge_map[(i, j)] = (i-1, j)
|
||
else:
|
||
# 向上查找第一个非continue的单元格
|
||
for k in range(i-1, -1, -1):
|
||
try:
|
||
prev_cell = table.cell(k, j)
|
||
prev_text = prev_cell.text.strip()
|
||
if prev_text:
|
||
text = prev_text
|
||
print(f"表头分析: 垂直合并单元格 at [{i},{j}],使用上方值 [{k},{j}]: {text}")
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
# 检查水平合并
|
||
if cell._element.tcPr is not None:
|
||
gridspan = self._get_gridspan_value(cell._element)
|
||
if gridspan > 1:
|
||
# 标记这是一个跨列的表头
|
||
print(f"表头分析: 水平合并单元格 at [{i},{j}],跨度 {gridspan},值: {text}")
|
||
if text: # 只处理有内容的单元格
|
||
text = f"SPAN_{gridspan}_{text}"
|
||
# 记录水平合并影响的列
|
||
for k in range(1, gridspan):
|
||
if j + k < cols:
|
||
header_merge_map[(i, j+k)] = (i, j)
|
||
|
||
if text:
|
||
column_headers.append(text)
|
||
last_header = text
|
||
|
||
except Exception as e:
|
||
print(f"表头分析: 处理表头单元格 [{i},{j}] 时出错: {str(e)}")
|
||
continue
|
||
|
||
header_structure.append(column_headers)
|
||
print(f"列 {j} 的表头结构: {column_headers}")
|
||
|
||
# 第二步:构建完整的表头标识符
|
||
full_headers = []
|
||
print("开始构建完整表头标识符...")
|
||
|
||
# 处理跨行跨列的表头
|
||
# 先进行一次预处理,处理合并单元格
|
||
for j, headers in enumerate(header_structure):
|
||
if not headers:
|
||
# 检查是否是被合并的列
|
||
is_merged = False
|
||
for i in range(max_header_rows):
|
||
if (i, j) in header_merge_map:
|
||
src_i, src_j = header_merge_map[(i, j)]
|
||
src_cell = table.cell(src_i, src_j)
|
||
src_text = src_cell.text.strip()
|
||
if src_text and src_j != j: # 确保是水平合并
|
||
print(f"表头补全: 列 {j} 被列 {src_j} 合并,添加表头: {src_text}")
|
||
header_structure[j].append(src_text)
|
||
is_merged = True
|
||
break
|
||
|
||
if not is_merged:
|
||
print(f"表头补全: 列 {j} 无表头,使用默认值: 列{j+1}")
|
||
header_structure[j].append(f"列{j+1}")
|
||
|
||
# 构建每列的完整表头
|
||
for j, headers in enumerate(header_structure):
|
||
if not headers:
|
||
full_headers.append(f"列{j+1}")
|
||
continue
|
||
|
||
# 处理跨列的表头
|
||
header_text = []
|
||
current_prefix = ""
|
||
for h in headers:
|
||
if h.startswith('SPAN_'):
|
||
parts = h.split('_', 2)
|
||
span = int(parts[1])
|
||
text = parts[2]
|
||
# 将跨列的表头添加到后续的列
|
||
for k in range(span):
|
||
if j + k < cols:
|
||
if k == 0:
|
||
if text != current_prefix: # 避免重复前缀
|
||
header_text.append(text)
|
||
current_prefix = text
|
||
else:
|
||
if text not in header_structure[j + k]:
|
||
header_structure[j + k].insert(0, text)
|
||
else:
|
||
if h != current_prefix: # 避免重复前缀
|
||
header_text.append(h)
|
||
current_prefix = h
|
||
|
||
# 移除重复的表头部分
|
||
unique_headers = []
|
||
seen = set()
|
||
for h in header_text:
|
||
if h not in seen:
|
||
unique_headers.append(h)
|
||
seen.add(h)
|
||
|
||
# 构建完整表头,使用特殊分隔符
|
||
if unique_headers:
|
||
full_header = '_'.join(unique_headers)
|
||
print(f"列 {j} 的完整表头: {full_header}")
|
||
full_headers.append(full_header)
|
||
else:
|
||
full_headers.append(f"列{j+1}")
|
||
|
||
# 确定实际的表头行数
|
||
header_row_count = max(len(headers) for headers in header_structure)
|
||
if header_row_count == 0:
|
||
header_row_count = 1
|
||
|
||
print(f"表头行数: {header_row_count}")
|
||
print(f"开始处理数据行,从第 {header_row_count} 行开始...")
|
||
|
||
# 创建跟踪已处理垂直合并单元格的集合
|
||
processed_vmerge = set()
|
||
|
||
# 处理数据行
|
||
for i in range(header_row_count, rows):
|
||
try:
|
||
row_data = []
|
||
j = 0
|
||
while j < cols:
|
||
try:
|
||
cell = table.cell(i, j)
|
||
text = cell.text.strip()
|
||
|
||
# 处理垂直合并单元格
|
||
if not text and cell._element.tcPr is not None:
|
||
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
val = vmerge[0].get(qn('w:val'), 'continue')
|
||
if val == 'continue':
|
||
# 向上查找非continue的值
|
||
for k in range(i-1, header_row_count-1, -1):
|
||
if (k, j) in processed_vmerge:
|
||
continue
|
||
try:
|
||
src_cell = table.cell(k, j)
|
||
src_text = src_cell.text.strip()
|
||
if src_text:
|
||
text = src_text
|
||
print(f"数据行处理: 垂直合并单元格 at [{i},{j}],使用上方值 [{k},{j}]: {text}")
|
||
break
|
||
except Exception:
|
||
continue
|
||
processed_vmerge.add((i, j))
|
||
|
||
# 处理水平合并
|
||
gridspan = self._get_gridspan_value(cell._element)
|
||
|
||
# 将值复制到所有合并的列
|
||
for k in range(gridspan):
|
||
if j + k < len(full_headers):
|
||
# 使用冒号分隔表头和值
|
||
if text:
|
||
row_data.append(f"{full_headers[j+k]}:{text}")
|
||
else:
|
||
row_data.append(f"{full_headers[j+k]}:")
|
||
|
||
j += gridspan
|
||
|
||
except Exception as e:
|
||
print(f"数据行处理: 处理数据单元格 [{i},{j}] 时出错: {str(e)}")
|
||
if j < len(full_headers):
|
||
row_data.append(f"{full_headers[j]}:")
|
||
j += 1
|
||
|
||
# 确保行中至少有一个非空值
|
||
if any(len(data.split(':', 1)) > 1 and data.split(':', 1)[1].strip() for data in row_data):
|
||
processed_line = " ".join(row_data)
|
||
print(f"添加处理行 {i}: {processed_line[:100]}..." if len(processed_line) > 100 else f"添加处理行 {i}: {processed_line}")
|
||
processed_data.append(processed_line)
|
||
|
||
except Exception as e:
|
||
print(f"数据行处理: 处理数据行 {i} 时出错: {str(e)}")
|
||
continue
|
||
|
||
else:
|
||
# 使用简单表格处理逻辑
|
||
print("使用简单表格处理逻辑...")
|
||
# 获取表头
|
||
headers = []
|
||
for j in range(cols):
|
||
try:
|
||
header_text = table.cell(0, j).text.strip()
|
||
if not header_text: # 如果表头为空,使用默认值
|
||
header_text = f"列{j+1}"
|
||
headers.append(header_text)
|
||
print(f"简单表格表头 {j}: {header_text}")
|
||
except Exception as e:
|
||
print(f"简单表格处理: 处理表头单元格 [0,{j}] 时出错: {str(e)}")
|
||
headers.append(f"列{j+1}")
|
||
|
||
# 处理数据行
|
||
for i in range(1, rows):
|
||
try:
|
||
row_data = []
|
||
for j in range(cols):
|
||
try:
|
||
text = table.cell(i, j).text.strip()
|
||
row_data.append(f"{headers[j]}:{text}")
|
||
except Exception as e:
|
||
print(f"简单表格处理: 处理数据单元格 [{i},{j}] 时出错: {str(e)}")
|
||
row_data.append(f"{headers[j]}:")
|
||
|
||
# 确保行中至少有一个非空值
|
||
if any(len(data.split(':', 1)) > 1 and data.split(':', 1)[1].strip() for data in row_data):
|
||
processed_line = " ".join(row_data)
|
||
print(f"添加简单表格行 {i}: {processed_line[:100]}..." if len(processed_line) > 100 else f"添加简单表格行 {i}: {processed_line}")
|
||
processed_data.append(processed_line)
|
||
|
||
except Exception as e:
|
||
print(f"简单表格处理: 处理数据行 {i} 时出错: {str(e)}")
|
||
continue
|
||
|
||
# 返回处理后的表格文本
|
||
if processed_data:
|
||
final_text = " ".join(processed_data)
|
||
print(f"表格处理完成,生成 {len(processed_data)} 行数据")
|
||
print(f"表格文本示例: {final_text[:200]}..." if len(final_text) > 200 else f"表格文本: {final_text}")
|
||
return final_text
|
||
else:
|
||
print("表格无有效数据")
|
||
return "【表格无有效数据】"
|
||
|
||
except Exception as e:
|
||
print(f"表格处理失败: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return "【表格处理失败】"
|
||
|
||
def _extract_table_text(self, table: Table) -> str:
|
||
"""
|
||
提取表格中的文本内容,现在会返回格式化的文本表示
|
||
|
||
Args:
|
||
table: docx表格对象
|
||
|
||
Returns:
|
||
str: 表格内容的文本表示
|
||
"""
|
||
return self._convert_table_to_text(table)
|
||
|
||
def _extract_document_images(self, doc) -> List[Dict]:
|
||
"""
|
||
从文档中提取图片,同时记录图片位置信息
|
||
|
||
Args:
|
||
doc: docx文档对象
|
||
|
||
Returns:
|
||
List[Dict]: 图片信息列表,包含索引、关系ID、文件名、二进制数据、位置信息等
|
||
"""
|
||
print("\n开始提取文档图片...")
|
||
images = []
|
||
image_index = 0
|
||
|
||
# 创建段落到索引的映射
|
||
paragraph_indices = {}
|
||
for i, paragraph in enumerate(doc.paragraphs):
|
||
paragraph_indices[paragraph._p] = i
|
||
|
||
try:
|
||
# 处理嵌入式图片 (InlineShape)
|
||
paragraph_with_images = {}
|
||
|
||
for i, paragraph in enumerate(doc.paragraphs):
|
||
# 检查段落中的所有run
|
||
for run in paragraph.runs:
|
||
# 检查run中是否有InlineShape
|
||
if hasattr(run, '_r') and run._r is not None:
|
||
for drawing in run._r.findall('.//w:drawing', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}):
|
||
# 找到了图片,记录它的段落位置
|
||
if i not in paragraph_with_images:
|
||
paragraph_with_images[i] = []
|
||
paragraph_with_images[i].append(True)
|
||
|
||
# 方法1: 处理InlineShape对象
|
||
for i, shape in enumerate(doc.inline_shapes):
|
||
try:
|
||
if shape.type == 3: # PICTURE type
|
||
# 获取图片关系ID
|
||
rid = shape._inline.graphic.graphicData.pic.blipFill.blip.embed
|
||
image_part = doc.part.related_parts[rid]
|
||
image_data = image_part.blob
|
||
|
||
# 找到图片所在的段落
|
||
paragraph_index = -1
|
||
parent_elem = shape._inline.getparent()
|
||
while parent_elem is not None:
|
||
if parent_elem.tag.endswith('p'):
|
||
if parent_elem in paragraph_indices:
|
||
paragraph_index = paragraph_indices[parent_elem]
|
||
break
|
||
parent_elem = parent_elem.getparent()
|
||
|
||
# 检查图片大小是否合适
|
||
if len(image_data) > 100: # 过滤掉太小的图片
|
||
# 从内容类型中获取扩展名
|
||
content_type = image_part.content_type
|
||
if 'png' in content_type:
|
||
image_ext = '.png'
|
||
elif 'jpeg' in content_type or 'jpg' in content_type:
|
||
image_ext = '.jpg'
|
||
elif 'gif' in content_type:
|
||
image_ext = '.gif'
|
||
elif 'bmp' in content_type:
|
||
image_ext = '.bmp'
|
||
else:
|
||
image_ext = '.img'
|
||
|
||
if image_ext in self.image_extensions:
|
||
# 生成唯一的图片文件名
|
||
image_filename = f"image_{image_index}{image_ext}"
|
||
|
||
# 检查是否已添加过相同关系ID的图片
|
||
duplicate = False
|
||
for img in images:
|
||
if img['rel_id'] == rid:
|
||
duplicate = True
|
||
break
|
||
|
||
if not duplicate:
|
||
images.append({
|
||
'index': image_index,
|
||
'rel_id': rid,
|
||
'filename': image_filename,
|
||
'data': image_data,
|
||
'paragraph_index': paragraph_index,
|
||
'ext': image_ext
|
||
})
|
||
|
||
print(f"提取图片 {image_index}: {image_filename} (大小: {len(image_data) // 1024} KB, 段落位置: {paragraph_index})")
|
||
image_index += 1
|
||
except Exception as e:
|
||
print(f"提取图片时出错(方法1): {str(e)}")
|
||
|
||
# 方法2: 从document.part.rels提取可能遗漏的图片
|
||
for rel in doc.part.rels.values():
|
||
if "image" in rel.reltype:
|
||
try:
|
||
image_data = rel.target_part.blob
|
||
|
||
# 检查图片大小
|
||
if len(image_data) > 100: # 过滤掉太小的图片
|
||
# 检查是否已添加过相同关系ID的图片
|
||
duplicate = False
|
||
for img in images:
|
||
if img['rel_id'] == rel.rId:
|
||
duplicate = True
|
||
break
|
||
|
||
if not duplicate:
|
||
image_ext = os.path.splitext(rel.target_ref)[1].lower()
|
||
if image_ext in self.image_extensions:
|
||
# 生成唯一的图片文件名
|
||
image_filename = f"image_{image_index}{image_ext}"
|
||
|
||
# 尝试找到此图片在文档中的位置
|
||
paragraph_index = -1 # 默认位置标记为未知
|
||
|
||
images.append({
|
||
'index': image_index,
|
||
'rel_id': rel.rId,
|
||
'filename': image_filename,
|
||
'data': image_data,
|
||
'paragraph_index': paragraph_index,
|
||
'ext': image_ext
|
||
})
|
||
|
||
print(f"提取图片 {image_index}: {image_filename} (大小: {len(image_data) // 1024} KB, 位置未知)")
|
||
image_index += 1
|
||
except Exception as e:
|
||
print(f"提取图片时出错(方法2): {str(e)}")
|
||
|
||
print(f"文档图片提取完成, 共提取 {len(images)} 张图片")
|
||
|
||
except Exception as e:
|
||
print(f"提取文档图片时出错: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
return images
|
||
|
||
def process_directory(input_dir: str, output_dir: str = None):
|
||
"""
|
||
处理指定目录下的所有文档文件
|
||
|
||
Args:
|
||
input_dir: 输入目录路径
|
||
output_dir: 输出目录路径,如果为None则使用输入目录
|
||
"""
|
||
# 如果未指定输出目录,使用输入目录
|
||
if output_dir is None:
|
||
output_dir = input_dir
|
||
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
cleaner = DocCleaner()
|
||
|
||
for root, _, files in os.walk(input_dir):
|
||
for file in files:
|
||
if file.endswith(('.doc', '.docx')):
|
||
input_path = os.path.join(root, file)
|
||
|
||
try:
|
||
# 清理文档
|
||
main_content, appendix, tables, images = cleaner.clean_doc(input_path)
|
||
|
||
# 创建输出文件名(统一使用docx扩展名)
|
||
base_name = os.path.splitext(file)[0]
|
||
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
|
||
|
||
# 保存为docx格式
|
||
cleaner.save_as_docx(main_content, appendix, tables, images, output_path)
|
||
|
||
except Exception as e:
|
||
print(f"处理文件 {file} 时出错: {str(e)}")
|
||
# 添加更详细的错误信息
|
||
if isinstance(e, subprocess.CalledProcessError):
|
||
print(f"命令执行错误: {e.output}")
|
||
elif isinstance(e, FileNotFoundError):
|
||
print("请确保已安装LibreOffice并将其添加到系统PATH中")
|
||
|
||
def qn(tag: str) -> str:
|
||
"""
|
||
将标签转换为带命名空间的格式
|
||
|
||
Args:
|
||
tag: 原始标签
|
||
|
||
Returns:
|
||
str: 带命名空间的标签
|
||
"""
|
||
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
|
||
return prefix + tag
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
|
||
# parser = argparse.ArgumentParser(description='文档清理工具')
|
||
# parser.add_argument('input_dir', help='输入目录路径')
|
||
# parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None)
|
||
#
|
||
# args = parser.parse_args()
|
||
|
||
process_directory("D:\\rzData\\poject\\AI项目\\UDI智能体\\测试文档", "D:\\rzData\\poject\\AI项目\\UDI智能体\\测试文档")
|
||
|
||
# 确保目录存在,如果不存在则创建
|
||
# 创建基础目录(使用更安全的方式)
|
||
# base_dir = 'D:\Desktop\DEMO'
|
||
# text_dir = os.path.join(base_dir, "测试")
|
||
#
|
||
# os.makedirs(text_dir, exist_ok=True, mode=0o777)
|
||
#
|
||
# print(f"目录是否存在: {os.path.exists(text_dir)}")
|
||
# print(f"完整路径: {os.path.abspath(text_dir)}") # 或者直接 print(f"完整路径: {text_dir}") |