doc-etl/doc_cleaner.py
2025-05-14 14:06:27 +08:00

1034 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import docx
import numpy as np
import requests
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Tuple, Dict, Optional
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.enum.table import WD_TABLE_ALIGNMENT
import subprocess
import tempfile
import json
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from copy import deepcopy
from docx.oxml import parse_xml
from docx.oxml.ns import nsdecls
class DocCleaner:
def __init__(self, ollama_host: str = "http://192.168.1.18:11434"):
"""
初始化文档清理器
Args:
ollama_host: Ollama服务器地址
"""
# 页眉页脚模式
self.header_footer_patterns = [
r'页码\s*\d+-\d+', # 页码格式页码1-1, 页码2-1等
r'\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码第X页共Y页
r'Page\s*\d+\s*of\s*\d+', # 英文页码
]
# 特殊符号模式
self.special_char_patterns = [
r'©\s*\d{4}.*?版权所有', # 版权信息
r'confidential', # 机密标记
r'draft|草稿', # 草稿标记
r'watermark', # 水印标记
]
# 附录和参考文献标题模式
self.appendix_patterns = [
r'^附录\s*[A-Za-z]?[\s:]',
r'^Appendix\s*[A-Za-z]?[\s:]',
r'^参考文献$',
r'^References$',
r'^Bibliography$'
]
# 初始化TF-IDF向量化器
self.vectorizer = TfidfVectorizer(
min_df=1,
stop_words='english'
)
self.ollama_host = ollama_host
self.embedding_model = "bge-m3:latest" # 使用nomic-embed-text模型进行文本嵌入
def _convert_doc_to_docx(self, doc_path: str) -> str:
"""
将doc格式转换为docx格式
Args:
doc_path: doc文件路径
Returns:
str: 转换后的docx文件路径
"""
# 创建临时文件路径
temp_dir = tempfile.mkdtemp()
temp_docx = os.path.join(temp_dir, 'temp.docx')
try:
# 使用sofficeLibreOffice进行转换
cmd = ['soffice', '--headless', '--convert-to', 'docx', '--outdir', temp_dir, doc_path]
subprocess.run(cmd, check=True, capture_output=True)
# 返回转换后的文件路径
return temp_docx
except subprocess.CalledProcessError as e:
raise Exception(f"转换doc文件失败: {str(e)}")
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table]]:
"""
清理文档并返回处理后的正文、附录和表格
Args:
file_path: 文档文件路径
Returns:
Tuple[List[str], List[str], List[Table]]: (清理后的正文段落列表, 附录段落列表, 表格列表)
"""
print(f"\n开始处理文档: {file_path}")
# 检测文件类型
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
# 如果是doc格式先转换为docx
if file_extension == '.doc':
temp_docx = self._convert_doc_to_docx(file_path)
doc = docx.Document(temp_docx)
# 清理临时文件
os.remove(temp_docx)
os.rmdir(os.path.dirname(temp_docx))
else:
doc = docx.Document(file_path)
# 提取所有内容(段落和表格)
content = []
tables = []
table_count = 0
try:
print("\n开始解析文档结构...")
# 遍历文档体中的所有元素
for element in doc._element.body:
if element.tag.endswith('p'):
try:
paragraph = docx.text.paragraph.Paragraph(element, doc)
text = paragraph.text.strip()
# 只添加非空段落
if text:
# 检查是否是附录标题
is_appendix = any(re.match(pattern, text, re.IGNORECASE)
for pattern in self.appendix_patterns)
content.append({
'type': 'paragraph',
'content': text,
'is_appendix_start': is_appendix
})
if is_appendix:
print(f"发现附录标题: {text}")
except Exception as e:
print(f"警告:处理段落时出错: {str(e)}")
continue
elif element.tag.endswith('tbl'):
try:
table = docx.table.Table(element, doc)
# 验证表格是否有效
if hasattr(table, 'rows') and hasattr(table, 'columns'):
tables.append(table)
content.append({
'type': 'table',
'index': table_count
})
print(f"发现表格 {table_count}: {len(table.rows)}行 x {len(table.columns)}")
table_count += 1
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
continue
except Exception as e:
print(f"警告:遍历文档内容时出错: {str(e)}")
print(f"\n文档结构解析完成:")
print(f"- 总元素数: {len(content)}")
print(f"- 表格数量: {len(tables)}")
# 分离正文和附录
main_content = []
appendix = []
is_appendix = False
print("\n开始分离正文和附录...")
for item in content:
if item['type'] == 'paragraph':
if item['is_appendix_start']:
is_appendix = True
print("进入附录部分")
if is_appendix:
appendix.append(item['content'])
else:
main_content.append(item['content'])
elif item['type'] == 'table':
table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}'
if is_appendix:
appendix.append(table_placeholder)
print(f"添加表格到附录: {table_placeholder}")
else:
main_content.append(table_placeholder)
print(f"添加表格到正文: {table_placeholder}")
print(f"\n分离完成:")
print(f"- 正文元素数: {len(main_content)}")
print(f"- 附录元素数: {len(appendix)}")
# 清理正文(保留表格标记)
cleaned_content = []
print("\n开始清理正文...")
for item in main_content:
if item.startswith('TABLE_PLACEHOLDER_'):
cleaned_content.append(item)
print(f"保留表格标记: {item}")
else:
cleaned_text = self._clean_text([item])[0]
if cleaned_text:
cleaned_content.append(cleaned_text)
print(f"\n清理完成:")
print(f"- 清理后元素数: {len(cleaned_content)}")
print("- 表格标记位置:")
for i, item in enumerate(cleaned_content):
if item.startswith('TABLE_PLACEHOLDER_'):
print(f" 位置 {i}: {item}")
return cleaned_content, appendix, tables
def _clean_text(self, text: List[str]) -> List[str]:
"""
清理文本内容
Args:
text: 待清理的文本段落列表
Returns:
List[str]: 清理后的文本段落列表
"""
cleaned = []
for paragraph in text:
# 如果是表格标记,直接保留
if paragraph.startswith('TABLE_PLACEHOLDER_'):
cleaned.append(paragraph)
continue
# 跳过空段落
if not paragraph.strip():
continue
# 检查是否是目录项(包含数字序号的行)
is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph))
if not is_toc_item:
# 移除页眉页脚
for pattern in self.header_footer_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 移除特殊符号
for pattern in self.special_char_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 如果段落不为空,添加到结果中
if paragraph.strip():
cleaned.append(paragraph.strip())
return cleaned
def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]:
"""
分离正文与附录/参考文献
Args:
paragraphs: 文档段落列表
Returns:
Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表)
"""
main_content = []
appendix = []
is_appendix = False
for p in paragraphs:
# 检查是否是附录开始
if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns):
is_appendix = True
if is_appendix:
appendix.append(p)
else:
main_content.append(p)
return main_content, appendix
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
"""
使用Ollama获取文本嵌入向量
Args:
texts: 文本列表
Returns:
np.ndarray: 嵌入向量矩阵
"""
embeddings = []
for text in texts:
try:
response = requests.post(
f"{self.ollama_host}/api/embeddings",
json={
"model": self.embedding_model,
"prompt": text
}
)
response.raise_for_status()
embedding = response.json()["embedding"]
embeddings.append(embedding)
except Exception as e:
print(f"获取文本嵌入失败: {str(e)}")
# 如果获取嵌入失败,使用零向量
embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768
return np.array(embeddings)
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
"""
删除重复段落,保持表格占位符的位置不变
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
try:
# 只对非表格段落进行去重
if text_paragraphs:
# 获取文本嵌入
text_only = [p[1] for p in text_paragraphs]
embeddings = self._get_embeddings(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(embeddings)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
except Exception as e:
print(f"使用Ollama嵌入模型失败回退到TF-IDF方法: {str(e)}")
# 如果使用Ollama失败回退到原来的TF-IDF方法
return self._remove_duplicates_tfidf(paragraphs)
def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]:
"""
使用TF-IDF方法删除重复段落作为备选方案
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
if text_paragraphs:
# 计算TF-IDF矩阵
text_only = [p[1] for p in text_paragraphs]
tfidf_matrix = self.vectorizer.fit_transform(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(tfidf_matrix)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], output_path: str):
"""
将清理后的内容保存为docx格式和txt格式
Args:
cleaned_content: 清理后的正文段落列表
appendix: 附录段落列表
tables: 表格列表
output_path: 输出文件路径
"""
print(f"\n开始保存文档: {output_path}")
print(f"- 正文元素数: {len(cleaned_content)}")
print(f"- 附录元素数: {len(appendix)}")
print(f"- 表格总数: {len(tables)}")
# 创建新文档
doc = docx.Document()
# 创建文本输出内容列表
text_output = []
# 添加正文内容和表格,保持它们的相对位置
print("\n处理正文内容...")
# 创建一个列表来存储所有要插入的元素
elements_to_insert = []
for i, content in enumerate(cleaned_content):
try:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 转换表格为文本格式
table_text = self._convert_table_to_text(table)
# 添加表格标题
title = doc.add_paragraph(f"表格 {table_index + 1}:")
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
elements_to_insert.append(('paragraph', title._element))
# 添加表格文本内容,使用等宽字体
p = doc.add_paragraph()
run = p.add_run(table_text)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
elements_to_insert.append(('paragraph', p._element))
# 添加空行
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
# 添加到文本输出
text_output.append(f"表格 {table_index + 1} 开始:")
text_output.append(table_text)
text_output.append(f"表格 {table_index + 1} 结束:")
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
elements_to_insert.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
text_output.append("【表格处理失败】")
else:
# 添加普通段落
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
elements_to_insert.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(content)
except Exception as e:
print(f"警告:处理段落或表格时出错: {str(e)}")
continue
# 按顺序将所有元素插入文档
for element_type, element in elements_to_insert:
doc._body._element.append(element)
# 如果有附录,添加分隔符和附录内容
if appendix:
print("\n处理附录内容...")
try:
# 添加分页符
doc.add_page_break()
# 添加附录标题
title = doc.add_paragraph("附录")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加到文本输出
text_output.append("附录")
# 添加附录内容
appendix_elements = []
for content in appendix:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 转换表格为文本格式
table_text = self._convert_table_to_text(table)
# 添加表格标题
title = doc.add_paragraph(f"附录表格 {table_index + 1}:")
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
appendix_elements.append(('paragraph', title._element))
# 添加表格文本内容,使用等宽字体
p = doc.add_paragraph()
run = p.add_run(table_text)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
appendix_elements.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(f"附录表格 {table_index + 1}:")
text_output.append(table_text)
except Exception as e:
print(f"警告:处理附录表格时出错: {str(e)}")
appendix_elements.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
text_output.append("【表格处理失败】")
else:
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
appendix_elements.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(content)
# 按顺序将附录元素插入文档
for element_type, element in appendix_elements:
doc._body._element.append(element)
except Exception as e:
print(f"警告:处理附录时出错: {str(e)}")
# 保存docx文档
try:
doc.save(output_path)
print("\nWord文档保存成功!")
except Exception as e:
print(f"错误保存Word文档时出错: {str(e)}")
raise
# 保存文本文件
try:
text_file_path = os.path.splitext(output_path)[0] + '.txt'
# 移除所有换行符并用空格连接
text_content = ' '.join([t.replace('\n', ' ').strip() for t in text_output if t.strip()])
with open(text_file_path, 'w', encoding='utf-8') as f:
f.write(text_content)
print(f"文本文件保存成功: {text_file_path}")
except Exception as e:
print(f"错误:保存文本文件时出错: {str(e)}")
raise
def _copy_table_fallback(self, doc: docx.Document, table: Table):
"""
表格复制的备用方法
Args:
doc: 目标文档
table: 源表格
"""
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
# 创建新表格
new_table = doc.add_table(rows=rows, cols=cols)
# 复制表格样式
if table.style:
new_table.style = table.style
# 复制表格属性
new_table._element.tblPr = deepcopy(table._element.tblPr)
# 复制网格信息
new_table._element.tblGrid = deepcopy(table._element.tblGrid)
# 创建单元格映射以跟踪合并
cell_map = {}
# 第一遍:标记合并的单元格
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
# 检查是否是合并单元格的一部分
if src_cell._element.tcPr is not None:
# 检查垂直合并
vmerge = src_cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'restart':
# 这是合并的起始单元格
span = self._get_vertical_span(table, i, j)
cell_map[(i, j)] = ('vmerge', span)
# 检查水平合并
gridspan = src_cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
span = int(gridspan[0].get(qn('w:val')))
if span > 1:
cell_map[(i, j)] = ('hmerge', span)
except Exception as e:
print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}")
# 第二遍:复制内容并执行合并
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
dst_cell = new_table.cell(i, j)
# 检查是否需要合并
if (i, j) in cell_map:
merge_type, span = cell_map[(i, j)]
if merge_type == 'vmerge':
# 垂直合并
for k in range(1, span):
if i + k < rows:
dst_cell.merge(new_table.cell(i + k, j))
elif merge_type == 'hmerge':
# 水平合并
for k in range(1, span):
if j + k < cols:
dst_cell.merge(new_table.cell(i, j + k))
# 复制单元格属性
if src_cell._element.tcPr is not None:
dst_cell._element.tcPr = deepcopy(src_cell._element.tcPr)
# 复制单元格内容
dst_cell.text = "" # 清除默认内容
for src_paragraph in src_cell.paragraphs:
dst_paragraph = dst_cell.add_paragraph()
# 复制段落属性
if src_paragraph._element.pPr is not None:
dst_paragraph._element.pPr = deepcopy(src_paragraph._element.pPr)
# 复制文本和格式
for src_run in src_paragraph.runs:
dst_run = dst_paragraph.add_run(src_run.text)
# 复制运行属性
if src_run._element.rPr is not None:
dst_run._element.rPr = deepcopy(src_run._element.rPr)
except Exception as e:
print(f"警告:复制单元格时出错 [{i},{j}]: {str(e)}")
continue
def _get_vmerge_value(self, cell_element) -> str:
"""
获取单元格的垂直合并属性
Args:
cell_element: 单元格元素
Returns:
str: 垂直合并属性值
"""
vmerge = cell_element.xpath('.//w:vMerge')
if vmerge:
return vmerge[0].get(qn('w:val'), 'continue')
return None
def _get_gridspan_value(self, cell_element) -> int:
"""
获取单元格的水平合并数量
Args:
cell_element: 单元格元素
Returns:
int: 水平合并的列数
"""
try:
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan and gridspan[0].get(qn('w:val')):
return int(gridspan[0].get(qn('w:val')))
except (ValueError, TypeError, AttributeError) as e:
print(f"警告获取gridspan值时出错: {str(e)}")
return 1 # 默认返回1表示没有合并
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
"""
计算垂直合并的行数
Args:
table: 表格对象
start_row: 起始行
col: 列号
Returns:
int: 垂直合并的行数
"""
span = 1
for i in range(start_row + 1, len(table.rows)):
cell = table.cell(i, col)
if self._get_vmerge_value(cell._element) == 'continue':
span += 1
else:
break
return span
def _convert_table_to_text(self, table: Table) -> str:
"""
将表格转换为文本格式,智能处理简单和复杂表格结构
Args:
table: docx表格对象
Returns:
str: 表格的文本表示
"""
try:
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
if rows == 0 or cols == 0:
return "【空表格】"
# 存储处理后的表格数据
processed_data = []
# 检查是否是复杂表格(具有合并单元格或多级表头)
is_complex_table = False
max_header_rows = min(3, rows) # 最多检查前3行
# 检查前几行是否存在合并单元格
for i in range(max_header_rows):
for j in range(cols):
try:
cell = table.cell(i, j)
if cell._element.tcPr is not None:
# 检查垂直合并
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
is_complex_table = True
break
# 检查水平合并
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
is_complex_table = True
break
except Exception:
continue
if is_complex_table:
break
if is_complex_table:
# 使用复杂表格处理逻辑
# 第一步:分析表头结构
header_structure = [] # 存储表头的层级结构
# 分析每一列的表头结构
for j in range(cols):
column_headers = []
last_header = None
for i in range(max_header_rows):
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 检查垂直合并
if cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'continue':
# 使用上一个非空表头
if last_header:
text = last_header
# 检查水平合并
if cell._element.tcPr is not None:
gridspan = self._get_gridspan_value(cell._element)
if gridspan > 1:
# 标记这是一个跨列的表头
text = f"SPAN_{gridspan}_{text}"
if text:
column_headers.append(text)
last_header = text
except Exception as e:
print(f"警告:分析表头单元格 [{i},{j}] 时出错: {str(e)}")
continue
header_structure.append(column_headers)
# 第二步:构建完整的表头标识符
full_headers = []
for j, headers in enumerate(header_structure):
if not headers:
full_headers.append(f"{j+1}")
continue
# 处理跨列的表头
header_text = []
current_prefix = ""
for h in headers:
if h.startswith('SPAN_'):
parts = h.split('_', 2)
span = int(parts[1])
text = parts[2]
# 将跨列的表头添加到后续的列
for k in range(span):
if j + k < cols:
if k == 0:
if text != current_prefix: # 避免重复前缀
header_text.append(text)
current_prefix = text
else:
if text not in header_structure[j + k]:
header_structure[j + k].insert(0, text)
else:
if h != current_prefix: # 避免重复前缀
header_text.append(h)
current_prefix = h
# 移除重复的表头部分
unique_headers = []
seen = set()
for h in header_text:
if h not in seen:
unique_headers.append(h)
seen.add(h)
full_headers.append('_'.join(unique_headers))
# 确定实际的表头行数
header_row_count = max(len(headers) for headers in header_structure)
if header_row_count == 0:
header_row_count = 1
# 处理数据行
for i in range(header_row_count, rows):
try:
row_data = []
j = 0
while j < cols:
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 处理垂直合并
if not text and cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge and vmerge[0].get(qn('w:val')) == 'continue':
# 使用上一行的值
text = table.cell(i-1, j).text.strip()
# 处理水平合并
gridspan = self._get_gridspan_value(cell._element)
# 将值复制到所有合并的列
for k in range(gridspan):
if j + k < len(full_headers):
row_data.append(f"{full_headers[j+k]}:{text}")
j += gridspan
except Exception as e:
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
if j < len(full_headers):
row_data.append(f"{full_headers[j]}:")
j += 1
# 确保行中至少有一个非空值
if any(data.split(':')[1].strip() for data in row_data):
processed_data.append(" ".join(row_data))
except Exception as e:
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
continue
else:
# 使用简单表格处理逻辑
# 获取表头
headers = []
for j in range(cols):
try:
header_text = table.cell(0, j).text.strip()
if not header_text: # 如果表头为空,使用默认值
header_text = f"{j+1}"
headers.append(header_text)
except Exception as e:
print(f"警告:处理表头单元格 [0,{j}] 时出错: {str(e)}")
headers.append(f"{j+1}")
# 处理数据行
for i in range(1, rows):
try:
row_data = []
for j in range(cols):
try:
text = table.cell(i, j).text.strip()
row_data.append(f"{headers[j]}:{text}")
except Exception as e:
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
row_data.append(f"{headers[j]}:")
# 确保行中至少有一个非空值
if any(data.split(':')[1].strip() for data in row_data):
processed_data.append(" ".join(row_data))
except Exception as e:
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
continue
# 返回处理后的表格文本
if processed_data:
return " ".join(processed_data)
else:
return "【表格无有效数据】"
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
return "【表格处理失败】"
def _extract_table_text(self, table: Table) -> str:
"""
提取表格中的文本内容,现在会返回格式化的文本表示
Args:
table: docx表格对象
Returns:
str: 表格内容的文本表示
"""
return self._convert_table_to_text(table)
def process_directory(input_dir: str, output_dir: str = None):
"""
处理指定目录下的所有文档文件
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径如果为None则使用输入目录
"""
# 如果未指定输出目录,使用输入目录
if output_dir is None:
output_dir = input_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cleaner = DocCleaner()
for root, _, files in os.walk(input_dir):
for file in files:
if file.endswith(('.doc', '.docx')):
input_path = os.path.join(root, file)
try:
# 清理文档
main_content, appendix, tables = cleaner.clean_doc(input_path)
# 创建输出文件名统一使用docx扩展名
base_name = os.path.splitext(file)[0]
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
# 保存为docx格式
cleaner.save_as_docx(main_content, appendix, tables, output_path)
except Exception as e:
print(f"处理文件 {file} 时出错: {str(e)}")
# 添加更详细的错误信息
if isinstance(e, subprocess.CalledProcessError):
print(f"命令执行错误: {e.output}")
elif isinstance(e, FileNotFoundError):
print("请确保已安装LibreOffice并将其添加到系统PATH中")
def qn(tag: str) -> str:
"""
将标签转换为带命名空间的格式
Args:
tag: 原始标签
Returns:
str: 带命名空间的标签
"""
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
return prefix + tag
if __name__ == '__main__':
import argparse
# parser = argparse.ArgumentParser(description='文档清理工具')
# parser.add_argument('input_dir', help='输入目录路径')
# parser.add_argument('--output_dir', help='输出目录路径(可选,默认为输入目录)', default=None)
#
# args = parser.parse_args()
process_directory("D:\Desktop\DEMO", "D:\Desktop\DEMO")
# 确保目录存在,如果不存在则创建
# 创建基础目录(使用更安全的方式)
# base_dir = 'D:\Desktop\DEMO'
# text_dir = os.path.join(base_dir, "测试")
#
# os.makedirs(text_dir, exist_ok=True, mode=0o777)
#
# print(f"目录是否存在: {os.path.exists(text_dir)}")
# print(f"完整路径: {os.path.abspath(text_dir)}") # 或者直接 print(f"完整路径: {text_dir}")