doc-etl/doc_cleaner_java.py
2025-05-16 11:30:02 +08:00

1394 lines
53 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import docx
import numpy as np
import requests
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Tuple, Dict, Optional
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.enum.table import WD_TABLE_ALIGNMENT
import subprocess
import tempfile
import json
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from copy import deepcopy
from docx.oxml import parse_xml
from docx.oxml.ns import nsdecls
import logging
import base64
class DocCleaner:
def __init__(self, ollama_host: str = "http://192.168.1.24:11434"):
"""
初始化文档清理器
Args:
ollama_host: Ollama服务器地址
"""
# 页眉页脚模式
self.header_footer_patterns = [
r'页码\s*\d+-\d+', # 页码格式页码1-1, 页码2-1等
r'\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码第X页共Y页
r'Page\s*\d+\s*of\s*\d+', # 英文页码
]
# 特殊符号模式
self.special_char_patterns = [
r'©\s*\d{4}.*?版权所有', # 版权信息
r'confidential', # 机密标记
r'draft|草稿', # 草稿标记
r'watermark', # 水印标记
]
# 附录和参考文献标题模式
self.appendix_patterns = [
r'^附录\s*[A-Za-z]?[\s:]',
r'^Appendix\s*[A-Za-z]?[\s:]',
r'^参考文献$',
r'^References$',
r'^Bibliography$'
]
# 初始化TF-IDF向量化器
self.vectorizer = TfidfVectorizer(
min_df=1,
stop_words='english'
)
self.ollama_host = ollama_host
self.embedding_model = "bge-m3" # 使用nomic-embed-text模型进行文本嵌入
def _convert_doc_to_docx(self, doc_path: str) -> str:
"""
将doc格式转换为docx格式
Args:
doc_path: doc文件路径
Returns:
str: 转换后的docx文件路径
"""
print(f"\n开始转换DOC文件: {doc_path}")
# 创建临时文件路径
temp_dir = tempfile.mkdtemp()
temp_docx = os.path.join(temp_dir, 'temp.docx')
print(f"创建临时目录: {temp_dir}")
print(f"目标DOCX文件路径: {temp_docx}")
try:
# 首先清理可能存在的soffice进程
try:
if os.name == 'nt': # Windows
os.system('taskkill /f /im soffice.bin /t')
os.system('taskkill /f /im soffice.exe /t')
else: # Linux/Unix
os.system('pkill -9 soffice.bin')
os.system('pkill -9 soffice')
except Exception as e:
print(f"清理已有进程时出错(可以忽略): {str(e)}")
# 检测操作系统类型
if os.name == 'nt': # Windows
soffice_cmd = 'soffice'
print("检测到Windows系统使用soffice命令")
else: # Linux/Unix
# 常见的LibreOffice可执行文件路径
possible_paths = [
'libreoffice',
'soffice',
'/usr/bin/libreoffice',
'/usr/bin/soffice',
'/usr/lib/libreoffice/program/soffice',
'/opt/libreoffice*/program/soffice',
]
print("检测到Linux/Unix系统开始查找LibreOffice...")
soffice_cmd = None
for path in possible_paths:
try:
if '*' in path: # 处理通配符路径
import glob
matching_paths = glob.glob(path)
for match_path in matching_paths:
try:
print(f"尝试执行: {match_path} --version")
subprocess.run([match_path, '--version'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, timeout=5)
soffice_cmd = match_path
print(f"找到可用的LibreOffice: {soffice_cmd}")
break
except Exception as e:
print(f"尝试路径失败 {match_path}: {str(e)}")
else:
print(f"尝试执行: {path} --version")
subprocess.run([path, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
timeout=5)
soffice_cmd = path
print(f"找到可用的LibreOffice: {soffice_cmd}")
break
except Exception as e:
print(f"尝试路径失败 {path}: {str(e)}")
continue
if soffice_cmd is None:
# 尝试使用which命令查找
try:
print("尝试使用which命令查找LibreOffice...")
which_result = subprocess.run(['which', 'libreoffice'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
if which_result.returncode == 0:
soffice_cmd = which_result.stdout.strip()
print(f"通过which命令找到LibreOffice: {soffice_cmd}")
except Exception as e:
print(f"which命令查找失败: {str(e)}")
if soffice_cmd is None:
error_msg = """
未找到LibreOffice请按以下步骤安装
1. 对于Ubuntu/Debian系统
sudo apt-get update
sudo apt-get install libreoffice libreoffice-writer
2. 对于CentOS/RHEL系统
sudo yum update
sudo yum install libreoffice libreoffice-writer
3. 安装中文字体支持:
# Ubuntu/Debian
sudo apt-get install fonts-wqy-zenhei fonts-wqy-microhei
# CentOS/RHEL
sudo yum install wqy-zenhei-fonts wqy-microhei-fonts
4. 安装后验证:
libreoffice --version
5. 如果仍然失败,请确保:
- LibreOffice已正确安装
- 可执行文件在系统PATH中
- 当前用户有执行权限
- 临时目录(/tmp)有足够的权限
"""
raise Exception(error_msg)
print(f"\n使用命令转换文件: {soffice_cmd}")
# 使用sofficeLibreOffice进行转换
cmd = [
soffice_cmd,
'--headless',
'--convert-to',
'docx:MS Word 2007 XML', # 指定具体的输出格式
'--outdir',
temp_dir,
doc_path
]
print(f"完整转换命令: {' '.join(cmd)}")
# 执行转换命令,设置较长的超时时间
try:
process = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # 设置2分钟超时
)
if process.returncode != 0:
error_msg = process.stderr or "未知错误"
raise Exception(f"转换失败: {error_msg}")
print("文件转换成功")
except subprocess.TimeoutExpired:
# 超时时清理进程
if os.name == 'nt': # Windows
os.system('taskkill /f /im soffice.bin /t')
os.system('taskkill /f /im soffice.exe /t')
else: # Linux/Unix
os.system('pkill -9 soffice.bin')
os.system('pkill -9 soffice')
raise Exception("转换超时300秒已终止进程。请检查LibreOffice是否正常运行或尝试手动转换文件。")
# 验证输出文件
if not os.path.exists(temp_docx):
raise Exception("转换后的文件未找到")
file_size = os.path.getsize(temp_docx)
if file_size == 0:
raise Exception("转换后的文件大小为0")
print(f"转换完成,输出文件大小: {file_size} bytes")
return temp_docx
except Exception as e:
print(f"转换doc文件失败: {str(e)}")
# 清理临时文件
try:
if os.path.exists(temp_dir):
import shutil
shutil.rmtree(temp_dir)
except:
pass
# 清理可能残留的进程
try:
if os.name == 'nt': # Windows
os.system('taskkill /f /im soffice.bin /t')
os.system('taskkill /f /im soffice.exe /t')
else: # Linux/Unix
os.system('pkill -9 soffice.bin')
os.system('pkill -9 soffice')
except:
pass
raise Exception(f"转换doc文件失败: {str(e)}")
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table]]:
"""
清理文档并返回处理后的正文、附录和表格
Args:
file_path: 文档文件路径
Returns:
Tuple[List[str], List[str], List[Table]]: (清理后的正文段落列表, 附录段落列表, 表格列表)
"""
# 检测文件类型
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
# 如果是doc格式直接报错
if file_extension == '.doc':
raise Exception("不支持doc格式请先将文件转换为docx格式后再处理")
doc = docx.Document(file_path)
# 提取所有内容(段落和表格)
content = []
tables = []
table_count = 0
try:
# 遍历文档体中的所有元素
for element in doc._element.body:
if element.tag.endswith('p'):
try:
paragraph = docx.text.paragraph.Paragraph(element, doc)
text = paragraph.text.strip()
# 只添加非空段落
if text:
# 检查是否是附录标题
is_appendix = any(re.match(pattern, text, re.IGNORECASE)
for pattern in self.appendix_patterns)
content.append({
'type': 'paragraph',
'content': text,
'is_appendix_start': is_appendix
})
except Exception as e:
continue
elif element.tag.endswith('tbl'):
try:
table = docx.table.Table(element, doc)
# 验证表格是否有效
if hasattr(table, 'rows') and hasattr(table, 'columns'):
tables.append(table)
content.append({
'type': 'table',
'index': table_count
})
table_count += 1
except Exception as e:
continue
except Exception as e:
raise Exception(f"解析文档结构失败: {str(e)}")
# 分离正文和附录
main_content = []
appendix = []
is_appendix = False
for item in content:
if item['type'] == 'paragraph':
if item['is_appendix_start']:
is_appendix = True
if is_appendix:
appendix.append(item['content'])
else:
main_content.append(item['content'])
elif item['type'] == 'table':
table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}'
if is_appendix:
appendix.append(table_placeholder)
else:
main_content.append(table_placeholder)
# 清理正文(保留表格标记)
cleaned_content = []
for item in main_content:
if item.startswith('TABLE_PLACEHOLDER_'):
cleaned_content.append(item)
else:
cleaned_text = self._clean_text([item])[0]
if cleaned_text:
cleaned_content.append(cleaned_text)
return cleaned_content, appendix, tables
def _clean_text(self, text: List[str]) -> List[str]:
"""
清理文本内容
Args:
text: 待清理的文本段落列表
Returns:
List[str]: 清理后的文本段落列表
"""
cleaned = []
for paragraph in text:
# 如果是表格标记,直接保留
if paragraph.startswith('TABLE_PLACEHOLDER_'):
cleaned.append(paragraph)
continue
# 跳过空段落
if not paragraph.strip():
continue
# 检查是否是目录项(包含数字序号的行)
is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph))
if not is_toc_item:
# 移除页眉页脚
for pattern in self.header_footer_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 移除特殊符号
for pattern in self.special_char_patterns:
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
# 如果段落不为空,添加到结果中
if paragraph.strip():
cleaned.append(paragraph.strip())
return cleaned
def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]:
"""
分离正文与附录/参考文献
Args:
paragraphs: 文档段落列表
Returns:
Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表)
"""
main_content = []
appendix = []
is_appendix = False
for p in paragraphs:
# 检查是否是附录开始
if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns):
is_appendix = True
if is_appendix:
appendix.append(p)
else:
main_content.append(p)
return main_content, appendix
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
"""
使用Ollama获取文本嵌入向量
Args:
texts: 文本列表
Returns:
np.ndarray: 嵌入向量矩阵
"""
embeddings = []
for text in texts:
try:
response = requests.post(
f"{self.ollama_host}/api/embeddings",
json={
"model": self.embedding_model,
"prompt": text
}
)
response.raise_for_status()
embedding = response.json()["embedding"]
embeddings.append(embedding)
except Exception as e:
print(f"获取文本嵌入失败: {str(e)}")
# 如果获取嵌入失败,使用零向量
embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768
return np.array(embeddings)
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
"""
删除重复段落,保持表格占位符的位置不变
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
try:
# 只对非表格段落进行去重
if text_paragraphs:
# 获取文本嵌入
text_only = [p[1] for p in text_paragraphs]
embeddings = self._get_embeddings(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(embeddings)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
except Exception as e:
print(f"使用Ollama嵌入模型失败回退到TF-IDF方法: {str(e)}")
# 如果使用Ollama失败回退到原来的TF-IDF方法
return self._remove_duplicates_tfidf(paragraphs)
def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]:
"""
使用TF-IDF方法删除重复段落作为备选方案
Args:
paragraphs: 段落列表
similarity_threshold: 相似度阈值
Returns:
List[str]: 去重后的段落列表
"""
if not paragraphs:
return []
# 分离表格占位符和普通段落
table_placeholders = {}
text_paragraphs = []
for i, p in enumerate(paragraphs):
if p.startswith('TABLE_PLACEHOLDER_'):
table_placeholders[i] = p
else:
text_paragraphs.append((i, p))
if text_paragraphs:
# 计算TF-IDF矩阵
text_only = [p[1] for p in text_paragraphs]
tfidf_matrix = self.vectorizer.fit_transform(text_only)
# 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(tfidf_matrix)
# 标记要保留的段落
keep_indices = []
for i in range(len(text_paragraphs)):
# 如果当前段落没有与之前的段落高度相似,则保留
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
keep_indices.append(i)
# 保留的非表格段落
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
else:
kept_paragraphs = []
# 合并表格占位符和保留的段落,按原始位置排序
all_kept = list(table_placeholders.items()) + kept_paragraphs
all_kept.sort(key=lambda x: x[0])
return [p[1] for p in all_kept]
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], output_path: str):
"""
将清理后的内容保存为docx格式和txt格式
Args:
cleaned_content: 清理后的正文段落列表
appendix: 附录段落列表
tables: 表格列表
output_path: 输出文件路径
"""
print(f"\n开始保存文档: {output_path}")
print(f"- 正文元素数: {len(cleaned_content)}")
print(f"- 附录元素数: {len(appendix)}")
print(f"- 表格总数: {len(tables)}")
# 创建新文档
doc = docx.Document()
# 创建文本输出内容列表
text_output = []
# 添加正文内容和表格,保持它们的相对位置
print("\n处理正文内容...")
# 创建一个列表来存储所有要插入的元素
elements_to_insert = []
for i, content in enumerate(cleaned_content):
try:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 转换表格为文本格式
table_text = self._convert_table_to_text(table)
# 添加表格标题
title = doc.add_paragraph(f"表格 {table_index + 1}:")
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
elements_to_insert.append(('paragraph', title._element))
# 添加表格文本内容,使用等宽字体
p = doc.add_paragraph()
run = p.add_run(table_text)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
elements_to_insert.append(('paragraph', p._element))
# 添加空行
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
# 添加到文本输出
text_output.append(f"表格 {table_index + 1}:")
text_output.append(table_text)
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
elements_to_insert.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
text_output.append("【表格处理失败】")
else:
# 添加普通段落
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
elements_to_insert.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(content)
except Exception as e:
print(f"警告:处理段落或表格时出错: {str(e)}")
continue
# 按顺序将所有元素插入文档
for element_type, element in elements_to_insert:
doc._body._element.append(element)
# 如果有附录,添加分隔符和附录内容
if appendix:
print("\n处理附录内容...")
try:
# 添加分页符
doc.add_page_break()
# 添加附录标题
title = doc.add_paragraph("附录")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加到文本输出
text_output.append("附录")
# 添加附录内容
appendix_elements = []
for content in appendix:
# 检查是否是表格占位符
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
if table_match:
table_index = int(table_match.group(1))
print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})")
if table_index < len(tables):
table = tables[table_index]
try:
# 转换表格为文本格式
table_text = self._convert_table_to_text(table)
# 添加表格标题
title = doc.add_paragraph(f"附录表格 {table_index + 1}:")
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
appendix_elements.append(('paragraph', title._element))
# 添加表格文本内容,使用等宽字体
p = doc.add_paragraph()
run = p.add_run(table_text)
run.font.name = 'Courier New' # 使用等宽字体
run.font.size = Pt(10) # 设置字体大小
appendix_elements.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(f"附录表格 {table_index + 1}:")
text_output.append(table_text)
except Exception as e:
print(f"警告:处理附录表格时出错: {str(e)}")
appendix_elements.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
text_output.append("【表格处理失败】")
else:
p = doc.add_paragraph(content)
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
appendix_elements.append(('paragraph', p._element))
# 添加到文本输出
text_output.append(content)
# 按顺序将附录元素插入文档
for element_type, element in appendix_elements:
doc._body._element.append(element)
except Exception as e:
print(f"警告:处理附录时出错: {str(e)}")
# 保存docx文档
try:
doc.save(output_path)
print("\nWord文档保存成功!")
except Exception as e:
print(f"错误保存Word文档时出错: {str(e)}")
raise
# 保存文本文件
try:
text_file_path = os.path.splitext(output_path)[0] + '.txt'
# 移除所有换行符并用空格连接
text_content = ' '.join([t.replace('\n', ' ').strip() for t in text_output if t.strip()])
with open(text_file_path, 'w', encoding='utf-8') as f:
f.write(text_content)
print(f"文本文件保存成功: {text_file_path}")
except Exception as e:
print(f"错误:保存文本文件时出错: {str(e)}")
raise
def _copy_table_fallback(self, doc: docx.Document, table: Table):
"""
表格复制的备用方法
Args:
doc: 目标文档
table: 源表格
"""
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
# 创建新表格
new_table = doc.add_table(rows=rows, cols=cols)
# 复制表格样式
if table.style:
new_table.style = table.style
# 复制表格属性
new_table._element.tblPr = deepcopy(table._element.tblPr)
# 复制网格信息
new_table._element.tblGrid = deepcopy(table._element.tblGrid)
# 创建单元格映射以跟踪合并
cell_map = {}
# 第一遍:标记合并的单元格
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
# 检查是否是合并单元格的一部分
if src_cell._element.tcPr is not None:
# 检查垂直合并
vmerge = src_cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'restart':
# 这是合并的起始单元格
span = self._get_vertical_span(table, i, j)
cell_map[(i, j)] = ('vmerge', span)
# 检查水平合并
gridspan = src_cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
span = int(gridspan[0].get(qn('w:val')))
if span > 1:
cell_map[(i, j)] = ('hmerge', span)
except Exception as e:
print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}")
# 第二遍:复制内容并执行合并
for i in range(rows):
for j in range(cols):
try:
src_cell = table.cell(i, j)
dst_cell = new_table.cell(i, j)
# 检查是否需要合并
if (i, j) in cell_map:
merge_type, span = cell_map[(i, j)]
if merge_type == 'vmerge':
# 垂直合并
for k in range(1, span):
if i + k < rows:
dst_cell.merge(new_table.cell(i + k, j))
elif merge_type == 'hmerge':
# 水平合并
for k in range(1, span):
if j + k < cols:
dst_cell.merge(new_table.cell(i, j + k))
# 复制单元格属性
if src_cell._element.tcPr is not None:
dst_cell._element.tcPr = deepcopy(src_cell._element.tcPr)
# 复制单元格内容
dst_cell.text = "" # 清除默认内容
for src_paragraph in src_cell.paragraphs:
dst_paragraph = dst_cell.add_paragraph()
# 复制段落属性
if src_paragraph._element.pPr is not None:
dst_paragraph._element.pPr = deepcopy(src_paragraph._element.pPr)
# 复制文本和格式
for src_run in src_paragraph.runs:
dst_run = dst_paragraph.add_run(src_run.text)
# 复制运行属性
if src_run._element.rPr is not None:
dst_run._element.rPr = deepcopy(src_run._element.rPr)
except Exception as e:
print(f"警告:复制单元格时出错 [{i},{j}]: {str(e)}")
continue
def _get_vmerge_value(self, cell_element) -> str:
"""
获取单元格的垂直合并属性
Args:
cell_element: 单元格元素
Returns:
str: 垂直合并属性值
"""
vmerge = cell_element.xpath('.//w:vMerge')
if vmerge:
return vmerge[0].get(qn('w:val'), 'continue')
return None
def _get_gridspan_value(self, cell_element) -> int:
"""
获取单元格的水平合并数量
Args:
cell_element: 单元格元素
Returns:
int: 水平合并的列数
"""
try:
gridspan = cell_element.xpath('.//w:gridSpan')
if gridspan and gridspan[0].get(qn('w:val')):
return int(gridspan[0].get(qn('w:val')))
except (ValueError, TypeError, AttributeError) as e:
print(f"警告获取gridspan值时出错: {str(e)}")
return 1 # 默认返回1表示没有合并
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
"""
计算垂直合并的行数
Args:
table: 表格对象
start_row: 起始行
col: 列号
Returns:
int: 垂直合并的行数
"""
span = 1
for i in range(start_row + 1, len(table.rows)):
cell = table.cell(i, col)
if self._get_vmerge_value(cell._element) == 'continue':
span += 1
else:
break
return span
def _convert_table_to_text(self, table: Table) -> str:
"""
将表格转换为文本格式,智能处理简单和复杂表格结构
Args:
table: docx表格对象
Returns:
str: 表格的文本表示
"""
try:
# 获取表格的行数和列数
rows = len(table.rows)
cols = len(table.columns)
if rows == 0 or cols == 0:
return "【空表格】"
# 存储处理后的表格数据
processed_data = []
# 检查是否是复杂表格(具有合并单元格或多级表头)
is_complex_table = False
max_header_rows = min(3, rows) # 最多检查前3行
# 检查前几行是否存在合并单元格
for i in range(max_header_rows):
for j in range(cols):
try:
cell = table.cell(i, j)
if cell._element.tcPr is not None:
# 检查垂直合并
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
is_complex_table = True
break
# 检查水平合并
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
if gridspan:
is_complex_table = True
break
except Exception:
continue
if is_complex_table:
break
if is_complex_table:
# 使用复杂表格处理逻辑
# 第一步:分析表头结构
header_structure = [] # 存储表头的层级结构
# 分析每一列的表头结构
for j in range(cols):
column_headers = []
last_header = None
for i in range(max_header_rows):
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 检查垂直合并
if cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge:
val = vmerge[0].get(qn('w:val'), 'continue')
if val == 'continue':
# 使用上一个非空表头
if last_header:
text = last_header
# 检查水平合并
if cell._element.tcPr is not None:
gridspan = self._get_gridspan_value(cell._element)
if gridspan > 1:
# 标记这是一个跨列的表头
text = f"SPAN_{gridspan}_{text}"
if text:
column_headers.append(text)
last_header = text
except Exception as e:
print(f"警告:分析表头单元格 [{i},{j}] 时出错: {str(e)}")
continue
header_structure.append(column_headers)
# 第二步:构建完整的表头标识符
full_headers = []
for j, headers in enumerate(header_structure):
if not headers:
full_headers.append(f"{j + 1}")
continue
# 处理跨列的表头
header_text = []
current_prefix = ""
for h in headers:
if h.startswith('SPAN_'):
parts = h.split('_', 2)
span = int(parts[1])
text = parts[2]
# 将跨列的表头添加到后续的列
for k in range(span):
if j + k < cols:
if k == 0:
if text != current_prefix: # 避免重复前缀
header_text.append(text)
current_prefix = text
else:
if text not in header_structure[j + k]:
header_structure[j + k].insert(0, text)
else:
if h != current_prefix: # 避免重复前缀
header_text.append(h)
current_prefix = h
# 移除重复的表头部分
unique_headers = []
seen = set()
for h in header_text:
if h not in seen:
unique_headers.append(h)
seen.add(h)
full_headers.append('_'.join(unique_headers))
# 确定实际的表头行数
header_row_count = max(len(headers) for headers in header_structure)
if header_row_count == 0:
header_row_count = 1
# 处理数据行
for i in range(header_row_count, rows):
try:
row_data = []
j = 0
while j < cols:
try:
cell = table.cell(i, j)
text = cell.text.strip()
# 处理垂直合并
if not text and cell._element.tcPr is not None:
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
if vmerge and vmerge[0].get(qn('w:val')) == 'continue':
# 使用上一行的值
text = table.cell(i - 1, j).text.strip()
# 处理水平合并
gridspan = self._get_gridspan_value(cell._element)
# 将值复制到所有合并的列
for k in range(gridspan):
if j + k < len(full_headers):
row_data.append(f"{full_headers[j + k]}:{text}")
j += gridspan
except Exception as e:
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
if j < len(full_headers):
row_data.append(f"{full_headers[j]}:")
j += 1
# 确保行中至少有一个非空值
if any(data.split(':')[1].strip() for data in row_data):
processed_data.append(" ".join(row_data))
except Exception as e:
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
continue
else:
# 使用简单表格处理逻辑
# 获取表头
headers = []
for j in range(cols):
try:
header_text = table.cell(0, j).text.strip()
if not header_text: # 如果表头为空,使用默认值
header_text = f"{j + 1}"
headers.append(header_text)
except Exception as e:
print(f"警告:处理表头单元格 [0,{j}] 时出错: {str(e)}")
headers.append(f"{j + 1}")
# 处理数据行
for i in range(1, rows):
try:
row_data = []
for j in range(cols):
try:
text = table.cell(i, j).text.strip()
row_data.append(f"{headers[j]}:{text}")
except Exception as e:
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
row_data.append(f"{headers[j]}:")
# 确保行中至少有一个非空值
if any(data.split(':')[1].strip() for data in row_data):
processed_data.append(" ".join(row_data))
except Exception as e:
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
continue
# 返回处理后的表格文本
if processed_data:
return " ".join(processed_data)
else:
return "【表格无有效数据】"
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
return "【表格处理失败】"
def _extract_table_text(self, table: Table) -> str:
"""
提取表格中的文本内容,现在会返回格式化的文本表示
Args:
table: docx表格对象
Returns:
str: 表格内容的文本表示
"""
return self._convert_table_to_text(table)
def process_file(byte_array: bytes, suffix: str = 'docx') -> Tuple[bytes, str]:
"""
处理文件的二进制数据
Args:
byte_array: 文件的二进制数据
suffix: 文件后缀名(不含点号,例如'doc''docx'
Returns:
Tuple[bytes, str]: (docx文件字节流, 文本内容)
"""
try:
# 确保后缀名格式正确(添加点号)
suffix = suffix.lower().strip()
if not suffix.startswith('.'):
suffix = '.' + suffix
# 创建临时文件
temp_dir = tempfile.mkdtemp()
temp_file = os.path.join(temp_dir, f'temp{suffix}')
# 保存二进制数据到临时文件
with open(temp_file, 'wb') as f:
f.write(byte_array)
# 检查文件大小
file_size = len(byte_array)
if file_size > 50 * 1024 * 1024: # 50MB
raise Exception("文件大小超过50MB限制")
# 检查文件格式
if suffix.lower() not in ['.doc', '.docx']:
raise Exception("不支持的文件格式,仅支持.doc和.docx格式")
# 检查文件头部特征
file_type = None
if len(byte_array) >= 8:
# DOCX文件特征 (ZIP格式以PK\x03\x04开头)
if byte_array.startswith(b'PK\x03\x04'):
file_type = 'docx'
logging.info("检测到DOCX文件格式")
# DOC文件特征 (复合文件二进制格式以D0CF11E0开头)
elif byte_array.startswith(b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'):
file_type = 'doc'
logging.info("检测到DOC文件格式")
# 如果无法通过文件头识别,尝试通过内容特征识别
if not file_type and len(byte_array) >= 512:
content_start = byte_array[:512]
# 检查是否包含Word文档的特征字符串
if (b'Microsoft Word' in content_start or
b'word/document.xml' in content_start or
b'Word.Document' in content_start):
file_type = 'unknown_word'
logging.info("通过内容特征检测到Word文档")
else:
# 尝试读取文件内容
try:
with open(temp_file, 'rb') as f:
# 尝试以ZIP格式打开DOCX格式
try:
import zipfile
with zipfile.ZipFile(f) as zf:
if any(name.startswith('word/') for name in zf.namelist()):
file_type = 'docx'
logging.info("通过ZIP结构检测到DOCX文件")
except zipfile.BadZipFile:
# 不是有效的ZIP文件尝试其他检测方法
pass
except Exception as e:
logging.warning(f"文件内容检测失败: {str(e)}")
if not file_type:
raise Exception("无法识别的Word文档格式")
# 检查文件后缀是否与实际格式匹配
if file_type == 'docx' and suffix.lower() != '.docx':
logging.warning("文件实际格式为DOCX但后缀为%s", suffix)
elif file_type == 'doc' and suffix.lower() != '.doc':
logging.warning("文件实际格式为DOC但后缀为%s", suffix)
# 如果是doc格式先转换为docx
input_file = temp_file
if file_type == 'doc' or (file_type == 'unknown_word' and suffix.lower() == '.doc'):
try:
input_file = DocCleaner()._convert_doc_to_docx(temp_file)
logging.info("DOC文件已成功转换为DOCX格式")
except Exception as e:
raise Exception(f"转换doc文件失败: {str(e)}")
cleaner = DocCleaner()
# 清理文档
main_content, appendix, tables = cleaner.clean_doc(input_file)
# 创建临时文件用于保存处理结果
output_docx = os.path.join(temp_dir, 'output.docx')
# 保存为docx格式
cleaner.save_as_docx(main_content, appendix, tables, output_docx)
# 读取docx文件内容
with open(output_docx, 'rb') as f:
docx_bytes = f.read()
# 读取文本内容
text_file = os.path.splitext(output_docx)[0] + '.txt'
with open(text_file, 'r', encoding='utf-8') as f:
text_content = f.read()
# 清理临时文件
os.remove(temp_file)
if input_file != temp_file:
try:
os.remove(input_file)
except:
pass
os.remove(output_docx)
os.remove(text_file)
os.rmdir(temp_dir)
return docx_bytes, text_content
except Exception as e:
logging.error(f"处理文件失败: {str(e)}")
raise Exception(f"处理文件失败: {str(e)}")
def process_directory(input_dir: str, output_dir: str = None):
"""
处理指定目录下的所有文档文件
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径如果为None则使用输入目录
"""
# 如果未指定输出目录,使用输入目录
if output_dir is None:
output_dir = input_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cleaner = DocCleaner()
for root, _, files in os.walk(input_dir):
for file in files:
if file.endswith(('.doc', '.docx')):
input_path = os.path.join(root, file)
try:
# 清理文档
main_content, appendix, tables = cleaner.clean_doc(input_path)
# 创建输出文件名统一使用docx扩展名
base_name = os.path.splitext(file)[0]
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
# 保存为docx格式
cleaner.save_as_docx(main_content, appendix, tables, output_path)
except Exception as e:
print(f"处理文件 {file} 时出错: {str(e)}")
# 添加更详细的错误信息
if isinstance(e, subprocess.CalledProcessError):
print(f"命令执行错误: {e.output}")
elif isinstance(e, FileNotFoundError):
print("请确保已安装LibreOffice并将其添加到系统PATH中")
def qn(tag: str) -> str:
"""
将标签转换为带命名空间的格式
Args:
tag: 原始标签
Returns:
str: 带命名空间的标签
"""
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
return prefix + tag
if __name__ == '__main__':
import argparse
import sys
import json
import base64
parser = argparse.ArgumentParser(description='文档清理工具')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--file', help='输入文件路径')
group.add_argument('--stdin', action='store_true', help='从标准输入读取Base64编码的文件二进制数据')
group.add_argument('--dir', help='输入目录路径')
parser.add_argument('--suffix', help='文件后缀名不含点号例如doc或docx', default='docx')
parser.add_argument('--output_dir', help='输出目录路径', required=True)
args = parser.parse_args()
try:
# 确保输出目录存在
os.makedirs(args.output_dir, exist_ok=True)
result = {
'status': 'success',
'message': '',
'docxPath': '',
'txtPath': ''
}
if args.stdin:
# 从标准输入读取Base64数据
try:
# 读取所有输入数据
base64_data = sys.stdin.read().strip()
# 解码Base64数据
byte_array = base64.b64decode(base64_data)
# 生成输出文件路径
output_docx = os.path.join(args.output_dir, f"output{args.suffix}")
output_txt = os.path.join(args.output_dir, "output.txt")
# 处理文件
docx_bytes, text_content = process_file(byte_array, args.suffix)
# 保存文件
with open(output_docx, 'wb') as f:
f.write(docx_bytes)
with open(output_txt, 'w', encoding='utf-8') as f:
f.write(text_content)
result['docxPath'] = output_docx
result['txtPath'] = output_txt
result['message'] = 'success'
logging.info(f"二进制数据处理成功")
except Exception as e:
result['status'] = 'error'
result['message'] = str(e)
logging.error(f"处理二进制数据失败: {str(e)}")
elif args.file:
# 处理单个文件
input_path = args.file
try:
# 读取文件内容
with open(input_path, 'rb') as f:
byte_array = f.read()
# 获取文件后缀
_, suffix = os.path.splitext(input_path)
# 生成输出文件路径
base_name = os.path.splitext(os.path.basename(input_path))[0]
output_docx = os.path.join(args.output_dir, f"{base_name}_cleaned.docx")
output_txt = os.path.join(args.output_dir, f"{base_name}_cleaned.txt")
# 处理文件
docx_bytes, text_content = process_file(byte_array, suffix)
# 保存文件
with open(output_docx, 'wb') as f:
f.write(docx_bytes)
with open(output_txt, 'w', encoding='utf-8') as f:
f.write(text_content)
result['docxPath'] = output_docx
result['txtPath'] = output_txt
result['message'] = 'success'
logging.info(f"文件处理成功: {input_path}")
except Exception as e:
result['status'] = 'error'
result['message'] = str(e)
logging.error(f"处理文件失败: {str(e)}")
else:
# 处理目录
try:
process_directory(args.dir, args.output_dir)
result['message'] = 'success'
logging.info(f"目录处理完成: {args.dir} -> {args.output_dir}")
except Exception as e:
result['status'] = 'error'
result['message'] = str(e)
logging.error(f"处理目录失败: {str(e)}")
# 只输出JSON格式的结果
print(json.dumps(result, ensure_ascii=False))
sys.exit(0 if result['status'] == 'success' else 1)
except Exception as e:
error_result = {
'status': 'error',
'message': str(e),
'docxPath': '',
'txtPath': ''
}
logging.error(f"程序执行错误: {str(e)}")
print(json.dumps(error_result, ensure_ascii=False))
sys.exit(1)