1394 lines
53 KiB
Python
1394 lines
53 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import re
|
||
import docx
|
||
import numpy as np
|
||
import requests
|
||
from sklearn.feature_extraction.text import TfidfVectorizer
|
||
from sklearn.metrics.pairwise import cosine_similarity
|
||
from typing import List, Tuple, Dict, Optional
|
||
from docx.shared import Pt
|
||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
|
||
from docx.enum.table import WD_TABLE_ALIGNMENT
|
||
import subprocess
|
||
import tempfile
|
||
import json
|
||
from docx.table import Table, _Cell
|
||
from docx.text.paragraph import Paragraph
|
||
from copy import deepcopy
|
||
from docx.oxml import parse_xml
|
||
from docx.oxml.ns import nsdecls
|
||
import logging
|
||
import base64
|
||
|
||
|
||
class DocCleaner:
|
||
def __init__(self, ollama_host: str = "http://192.168.1.24:11434"):
|
||
"""
|
||
初始化文档清理器
|
||
|
||
Args:
|
||
ollama_host: Ollama服务器地址
|
||
"""
|
||
# 页眉页脚模式
|
||
self.header_footer_patterns = [
|
||
r'页码\s*\d+-\d+', # 页码格式:页码1-1, 页码2-1等
|
||
r'第\s*\d+\s*页\s*共\s*\d+\s*页', # 中文页码(第X页共Y页)
|
||
r'Page\s*\d+\s*of\s*\d+', # 英文页码
|
||
]
|
||
|
||
# 特殊符号模式
|
||
self.special_char_patterns = [
|
||
r'©\s*\d{4}.*?版权所有', # 版权信息
|
||
r'confidential', # 机密标记
|
||
r'draft|草稿', # 草稿标记
|
||
r'watermark', # 水印标记
|
||
]
|
||
|
||
# 附录和参考文献标题模式
|
||
self.appendix_patterns = [
|
||
r'^附录\s*[A-Za-z]?[\s::]',
|
||
r'^Appendix\s*[A-Za-z]?[\s::]',
|
||
r'^参考文献$',
|
||
r'^References$',
|
||
r'^Bibliography$'
|
||
]
|
||
|
||
# 初始化TF-IDF向量化器
|
||
self.vectorizer = TfidfVectorizer(
|
||
min_df=1,
|
||
stop_words='english'
|
||
)
|
||
|
||
self.ollama_host = ollama_host
|
||
self.embedding_model = "bge-m3" # 使用nomic-embed-text模型进行文本嵌入
|
||
|
||
def _convert_doc_to_docx(self, doc_path: str) -> str:
|
||
"""
|
||
将doc格式转换为docx格式
|
||
|
||
Args:
|
||
doc_path: doc文件路径
|
||
|
||
Returns:
|
||
str: 转换后的docx文件路径
|
||
"""
|
||
print(f"\n开始转换DOC文件: {doc_path}")
|
||
|
||
# 创建临时文件路径
|
||
temp_dir = tempfile.mkdtemp()
|
||
temp_docx = os.path.join(temp_dir, 'temp.docx')
|
||
print(f"创建临时目录: {temp_dir}")
|
||
print(f"目标DOCX文件路径: {temp_docx}")
|
||
|
||
try:
|
||
# 首先清理可能存在的soffice进程
|
||
try:
|
||
if os.name == 'nt': # Windows
|
||
os.system('taskkill /f /im soffice.bin /t')
|
||
os.system('taskkill /f /im soffice.exe /t')
|
||
else: # Linux/Unix
|
||
os.system('pkill -9 soffice.bin')
|
||
os.system('pkill -9 soffice')
|
||
except Exception as e:
|
||
print(f"清理已有进程时出错(可以忽略): {str(e)}")
|
||
|
||
# 检测操作系统类型
|
||
if os.name == 'nt': # Windows
|
||
soffice_cmd = 'soffice'
|
||
print("检测到Windows系统,使用soffice命令")
|
||
else: # Linux/Unix
|
||
# 常见的LibreOffice可执行文件路径
|
||
possible_paths = [
|
||
'libreoffice',
|
||
'soffice',
|
||
'/usr/bin/libreoffice',
|
||
'/usr/bin/soffice',
|
||
'/usr/lib/libreoffice/program/soffice',
|
||
'/opt/libreoffice*/program/soffice',
|
||
]
|
||
|
||
print("检测到Linux/Unix系统,开始查找LibreOffice...")
|
||
soffice_cmd = None
|
||
for path in possible_paths:
|
||
try:
|
||
if '*' in path: # 处理通配符路径
|
||
import glob
|
||
matching_paths = glob.glob(path)
|
||
for match_path in matching_paths:
|
||
try:
|
||
print(f"尝试执行: {match_path} --version")
|
||
subprocess.run([match_path, '--version'], stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE, timeout=5)
|
||
soffice_cmd = match_path
|
||
print(f"找到可用的LibreOffice: {soffice_cmd}")
|
||
break
|
||
except Exception as e:
|
||
print(f"尝试路径失败 {match_path}: {str(e)}")
|
||
else:
|
||
print(f"尝试执行: {path} --version")
|
||
subprocess.run([path, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||
timeout=5)
|
||
soffice_cmd = path
|
||
print(f"找到可用的LibreOffice: {soffice_cmd}")
|
||
break
|
||
except Exception as e:
|
||
print(f"尝试路径失败 {path}: {str(e)}")
|
||
continue
|
||
|
||
if soffice_cmd is None:
|
||
# 尝试使用which命令查找
|
||
try:
|
||
print("尝试使用which命令查找LibreOffice...")
|
||
which_result = subprocess.run(['which', 'libreoffice'], stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE, text=True)
|
||
if which_result.returncode == 0:
|
||
soffice_cmd = which_result.stdout.strip()
|
||
print(f"通过which命令找到LibreOffice: {soffice_cmd}")
|
||
except Exception as e:
|
||
print(f"which命令查找失败: {str(e)}")
|
||
|
||
if soffice_cmd is None:
|
||
error_msg = """
|
||
未找到LibreOffice,请按以下步骤安装:
|
||
|
||
1. 对于Ubuntu/Debian系统:
|
||
sudo apt-get update
|
||
sudo apt-get install libreoffice libreoffice-writer
|
||
|
||
2. 对于CentOS/RHEL系统:
|
||
sudo yum update
|
||
sudo yum install libreoffice libreoffice-writer
|
||
|
||
3. 安装中文字体支持:
|
||
# Ubuntu/Debian:
|
||
sudo apt-get install fonts-wqy-zenhei fonts-wqy-microhei
|
||
|
||
# CentOS/RHEL:
|
||
sudo yum install wqy-zenhei-fonts wqy-microhei-fonts
|
||
|
||
4. 安装后验证:
|
||
libreoffice --version
|
||
|
||
5. 如果仍然失败,请确保:
|
||
- LibreOffice已正确安装
|
||
- 可执行文件在系统PATH中
|
||
- 当前用户有执行权限
|
||
- 临时目录(/tmp)有足够的权限
|
||
"""
|
||
raise Exception(error_msg)
|
||
|
||
print(f"\n使用命令转换文件: {soffice_cmd}")
|
||
# 使用soffice(LibreOffice)进行转换
|
||
cmd = [
|
||
soffice_cmd,
|
||
'--headless',
|
||
'--convert-to',
|
||
'docx:MS Word 2007 XML', # 指定具体的输出格式
|
||
'--outdir',
|
||
temp_dir,
|
||
doc_path
|
||
]
|
||
print(f"完整转换命令: {' '.join(cmd)}")
|
||
|
||
# 执行转换命令,设置较长的超时时间
|
||
try:
|
||
process = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=300 # 设置2分钟超时
|
||
)
|
||
|
||
if process.returncode != 0:
|
||
error_msg = process.stderr or "未知错误"
|
||
raise Exception(f"转换失败: {error_msg}")
|
||
print("文件转换成功")
|
||
|
||
except subprocess.TimeoutExpired:
|
||
# 超时时清理进程
|
||
if os.name == 'nt': # Windows
|
||
os.system('taskkill /f /im soffice.bin /t')
|
||
os.system('taskkill /f /im soffice.exe /t')
|
||
else: # Linux/Unix
|
||
os.system('pkill -9 soffice.bin')
|
||
os.system('pkill -9 soffice')
|
||
raise Exception("转换超时(300秒),已终止进程。请检查LibreOffice是否正常运行,或尝试手动转换文件。")
|
||
|
||
# 验证输出文件
|
||
if not os.path.exists(temp_docx):
|
||
raise Exception("转换后的文件未找到")
|
||
|
||
file_size = os.path.getsize(temp_docx)
|
||
if file_size == 0:
|
||
raise Exception("转换后的文件大小为0")
|
||
|
||
print(f"转换完成,输出文件大小: {file_size} bytes")
|
||
return temp_docx
|
||
|
||
except Exception as e:
|
||
print(f"转换doc文件失败: {str(e)}")
|
||
# 清理临时文件
|
||
try:
|
||
if os.path.exists(temp_dir):
|
||
import shutil
|
||
shutil.rmtree(temp_dir)
|
||
except:
|
||
pass
|
||
# 清理可能残留的进程
|
||
try:
|
||
if os.name == 'nt': # Windows
|
||
os.system('taskkill /f /im soffice.bin /t')
|
||
os.system('taskkill /f /im soffice.exe /t')
|
||
else: # Linux/Unix
|
||
os.system('pkill -9 soffice.bin')
|
||
os.system('pkill -9 soffice')
|
||
except:
|
||
pass
|
||
raise Exception(f"转换doc文件失败: {str(e)}")
|
||
|
||
def clean_doc(self, file_path: str) -> Tuple[List[str], List[str], List[Table]]:
|
||
"""
|
||
清理文档并返回处理后的正文、附录和表格
|
||
|
||
Args:
|
||
file_path: 文档文件路径
|
||
|
||
Returns:
|
||
Tuple[List[str], List[str], List[Table]]: (清理后的正文段落列表, 附录段落列表, 表格列表)
|
||
"""
|
||
# 检测文件类型
|
||
_, file_extension = os.path.splitext(file_path)
|
||
file_extension = file_extension.lower()
|
||
|
||
# 如果是doc格式,直接报错
|
||
if file_extension == '.doc':
|
||
raise Exception("不支持doc格式,请先将文件转换为docx格式后再处理")
|
||
|
||
doc = docx.Document(file_path)
|
||
|
||
# 提取所有内容(段落和表格)
|
||
content = []
|
||
tables = []
|
||
table_count = 0
|
||
|
||
try:
|
||
# 遍历文档体中的所有元素
|
||
for element in doc._element.body:
|
||
if element.tag.endswith('p'):
|
||
try:
|
||
paragraph = docx.text.paragraph.Paragraph(element, doc)
|
||
text = paragraph.text.strip()
|
||
|
||
# 只添加非空段落
|
||
if text:
|
||
# 检查是否是附录标题
|
||
is_appendix = any(re.match(pattern, text, re.IGNORECASE)
|
||
for pattern in self.appendix_patterns)
|
||
content.append({
|
||
'type': 'paragraph',
|
||
'content': text,
|
||
'is_appendix_start': is_appendix
|
||
})
|
||
except Exception as e:
|
||
continue
|
||
|
||
elif element.tag.endswith('tbl'):
|
||
try:
|
||
table = docx.table.Table(element, doc)
|
||
# 验证表格是否有效
|
||
if hasattr(table, 'rows') and hasattr(table, 'columns'):
|
||
tables.append(table)
|
||
content.append({
|
||
'type': 'table',
|
||
'index': table_count
|
||
})
|
||
table_count += 1
|
||
except Exception as e:
|
||
continue
|
||
|
||
except Exception as e:
|
||
raise Exception(f"解析文档结构失败: {str(e)}")
|
||
|
||
# 分离正文和附录
|
||
main_content = []
|
||
appendix = []
|
||
is_appendix = False
|
||
|
||
for item in content:
|
||
if item['type'] == 'paragraph':
|
||
if item['is_appendix_start']:
|
||
is_appendix = True
|
||
|
||
if is_appendix:
|
||
appendix.append(item['content'])
|
||
else:
|
||
main_content.append(item['content'])
|
||
|
||
elif item['type'] == 'table':
|
||
table_placeholder = f'TABLE_PLACEHOLDER_{item["index"]}'
|
||
if is_appendix:
|
||
appendix.append(table_placeholder)
|
||
else:
|
||
main_content.append(table_placeholder)
|
||
|
||
# 清理正文(保留表格标记)
|
||
cleaned_content = []
|
||
for item in main_content:
|
||
if item.startswith('TABLE_PLACEHOLDER_'):
|
||
cleaned_content.append(item)
|
||
else:
|
||
cleaned_text = self._clean_text([item])[0]
|
||
if cleaned_text:
|
||
cleaned_content.append(cleaned_text)
|
||
|
||
return cleaned_content, appendix, tables
|
||
|
||
def _clean_text(self, text: List[str]) -> List[str]:
|
||
"""
|
||
清理文本内容
|
||
|
||
Args:
|
||
text: 待清理的文本段落列表
|
||
|
||
Returns:
|
||
List[str]: 清理后的文本段落列表
|
||
"""
|
||
cleaned = []
|
||
for paragraph in text:
|
||
# 如果是表格标记,直接保留
|
||
if paragraph.startswith('TABLE_PLACEHOLDER_'):
|
||
cleaned.append(paragraph)
|
||
continue
|
||
|
||
# 跳过空段落
|
||
if not paragraph.strip():
|
||
continue
|
||
|
||
# 检查是否是目录项(包含数字序号的行)
|
||
is_toc_item = bool(re.match(r'^\s*(?:\d+\.)*\d+\s+.*', paragraph))
|
||
|
||
if not is_toc_item:
|
||
# 移除页眉页脚
|
||
for pattern in self.header_footer_patterns:
|
||
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
|
||
|
||
# 移除特殊符号
|
||
for pattern in self.special_char_patterns:
|
||
paragraph = re.sub(pattern, '', paragraph, flags=re.IGNORECASE)
|
||
|
||
# 如果段落不为空,添加到结果中
|
||
if paragraph.strip():
|
||
cleaned.append(paragraph.strip())
|
||
|
||
return cleaned
|
||
|
||
def _split_content(self, paragraphs: List[str]) -> Tuple[List[str], List[str]]:
|
||
"""
|
||
分离正文与附录/参考文献
|
||
|
||
Args:
|
||
paragraphs: 文档段落列表
|
||
|
||
Returns:
|
||
Tuple[List[str], List[str]]: (正文段落列表, 附录段落列表)
|
||
"""
|
||
main_content = []
|
||
appendix = []
|
||
is_appendix = False
|
||
|
||
for p in paragraphs:
|
||
# 检查是否是附录开始
|
||
if any(re.match(pattern, p, re.IGNORECASE) for pattern in self.appendix_patterns):
|
||
is_appendix = True
|
||
|
||
if is_appendix:
|
||
appendix.append(p)
|
||
else:
|
||
main_content.append(p)
|
||
|
||
return main_content, appendix
|
||
|
||
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
|
||
"""
|
||
使用Ollama获取文本嵌入向量
|
||
|
||
Args:
|
||
texts: 文本列表
|
||
|
||
Returns:
|
||
np.ndarray: 嵌入向量矩阵
|
||
"""
|
||
embeddings = []
|
||
|
||
for text in texts:
|
||
try:
|
||
response = requests.post(
|
||
f"{self.ollama_host}/api/embeddings",
|
||
json={
|
||
"model": self.embedding_model,
|
||
"prompt": text
|
||
}
|
||
)
|
||
response.raise_for_status()
|
||
embedding = response.json()["embedding"]
|
||
embeddings.append(embedding)
|
||
except Exception as e:
|
||
print(f"获取文本嵌入失败: {str(e)}")
|
||
# 如果获取嵌入失败,使用零向量
|
||
embeddings.append([0.0] * 768) # nomic-embed-text 模型输出维度为768
|
||
|
||
return np.array(embeddings)
|
||
|
||
def _remove_duplicates(self, paragraphs: List[str], similarity_threshold: float = 0.92) -> List[str]:
|
||
"""
|
||
删除重复段落,保持表格占位符的位置不变
|
||
|
||
Args:
|
||
paragraphs: 段落列表
|
||
similarity_threshold: 相似度阈值,使用嵌入模型后可以设置更高的阈值
|
||
|
||
Returns:
|
||
List[str]: 去重后的段落列表
|
||
"""
|
||
if not paragraphs:
|
||
return []
|
||
|
||
# 分离表格占位符和普通段落
|
||
table_placeholders = {}
|
||
text_paragraphs = []
|
||
for i, p in enumerate(paragraphs):
|
||
if p.startswith('TABLE_PLACEHOLDER_'):
|
||
table_placeholders[i] = p
|
||
else:
|
||
text_paragraphs.append((i, p))
|
||
|
||
try:
|
||
# 只对非表格段落进行去重
|
||
if text_paragraphs:
|
||
# 获取文本嵌入
|
||
text_only = [p[1] for p in text_paragraphs]
|
||
embeddings = self._get_embeddings(text_only)
|
||
|
||
# 计算余弦相似度矩阵
|
||
similarity_matrix = cosine_similarity(embeddings)
|
||
|
||
# 标记要保留的段落
|
||
keep_indices = []
|
||
for i in range(len(text_paragraphs)):
|
||
# 如果当前段落没有与之前的段落高度相似,则保留
|
||
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
|
||
keep_indices.append(i)
|
||
|
||
# 保留的非表格段落
|
||
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
|
||
else:
|
||
kept_paragraphs = []
|
||
|
||
# 合并表格占位符和保留的段落,按原始位置排序
|
||
all_kept = list(table_placeholders.items()) + kept_paragraphs
|
||
all_kept.sort(key=lambda x: x[0])
|
||
|
||
return [p[1] for p in all_kept]
|
||
|
||
except Exception as e:
|
||
print(f"使用Ollama嵌入模型失败,回退到TF-IDF方法: {str(e)}")
|
||
# 如果使用Ollama失败,回退到原来的TF-IDF方法
|
||
return self._remove_duplicates_tfidf(paragraphs)
|
||
|
||
def _remove_duplicates_tfidf(self, paragraphs: List[str], similarity_threshold: float = 0.85) -> List[str]:
|
||
"""
|
||
使用TF-IDF方法删除重复段落(作为备选方案)
|
||
|
||
Args:
|
||
paragraphs: 段落列表
|
||
similarity_threshold: 相似度阈值
|
||
|
||
Returns:
|
||
List[str]: 去重后的段落列表
|
||
"""
|
||
if not paragraphs:
|
||
return []
|
||
|
||
# 分离表格占位符和普通段落
|
||
table_placeholders = {}
|
||
text_paragraphs = []
|
||
for i, p in enumerate(paragraphs):
|
||
if p.startswith('TABLE_PLACEHOLDER_'):
|
||
table_placeholders[i] = p
|
||
else:
|
||
text_paragraphs.append((i, p))
|
||
|
||
if text_paragraphs:
|
||
# 计算TF-IDF矩阵
|
||
text_only = [p[1] for p in text_paragraphs]
|
||
tfidf_matrix = self.vectorizer.fit_transform(text_only)
|
||
|
||
# 计算余弦相似度矩阵
|
||
similarity_matrix = cosine_similarity(tfidf_matrix)
|
||
|
||
# 标记要保留的段落
|
||
keep_indices = []
|
||
for i in range(len(text_paragraphs)):
|
||
# 如果当前段落没有与之前的段落高度相似,则保留
|
||
if not any(similarity_matrix[i][j] > similarity_threshold for j in keep_indices):
|
||
keep_indices.append(i)
|
||
|
||
# 保留的非表格段落
|
||
kept_paragraphs = [(text_paragraphs[i][0], text_only[i]) for i in keep_indices]
|
||
else:
|
||
kept_paragraphs = []
|
||
|
||
# 合并表格占位符和保留的段落,按原始位置排序
|
||
all_kept = list(table_placeholders.items()) + kept_paragraphs
|
||
all_kept.sort(key=lambda x: x[0])
|
||
|
||
return [p[1] for p in all_kept]
|
||
|
||
def save_as_docx(self, cleaned_content: List[str], appendix: List[str], tables: List[Table], output_path: str):
|
||
"""
|
||
将清理后的内容保存为docx格式和txt格式
|
||
|
||
Args:
|
||
cleaned_content: 清理后的正文段落列表
|
||
appendix: 附录段落列表
|
||
tables: 表格列表
|
||
output_path: 输出文件路径
|
||
"""
|
||
print(f"\n开始保存文档: {output_path}")
|
||
print(f"- 正文元素数: {len(cleaned_content)}")
|
||
print(f"- 附录元素数: {len(appendix)}")
|
||
print(f"- 表格总数: {len(tables)}")
|
||
|
||
# 创建新文档
|
||
doc = docx.Document()
|
||
|
||
# 创建文本输出内容列表
|
||
text_output = []
|
||
|
||
# 添加正文内容和表格,保持它们的相对位置
|
||
print("\n处理正文内容...")
|
||
|
||
# 创建一个列表来存储所有要插入的元素
|
||
elements_to_insert = []
|
||
|
||
for i, content in enumerate(cleaned_content):
|
||
try:
|
||
# 检查是否是表格占位符
|
||
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
|
||
if table_match:
|
||
table_index = int(table_match.group(1))
|
||
print(f"正在处理表格占位符: {content} (索引: {table_index})")
|
||
if table_index < len(tables):
|
||
table = tables[table_index]
|
||
try:
|
||
# 转换表格为文本格式
|
||
table_text = self._convert_table_to_text(table)
|
||
|
||
# 添加表格标题
|
||
title = doc.add_paragraph(f"表格 {table_index + 1}:")
|
||
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
|
||
elements_to_insert.append(('paragraph', title._element))
|
||
|
||
# 添加表格文本内容,使用等宽字体
|
||
p = doc.add_paragraph()
|
||
run = p.add_run(table_text)
|
||
run.font.name = 'Courier New' # 使用等宽字体
|
||
run.font.size = Pt(10) # 设置字体大小
|
||
elements_to_insert.append(('paragraph', p._element))
|
||
|
||
# 添加空行
|
||
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
|
||
|
||
# 添加到文本输出
|
||
text_output.append(f"表格 {table_index + 1}:")
|
||
text_output.append(table_text)
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理表格时出错: {str(e)}")
|
||
elements_to_insert.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
|
||
text_output.append("【表格处理失败】")
|
||
else:
|
||
# 添加普通段落
|
||
p = doc.add_paragraph(content)
|
||
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
|
||
elements_to_insert.append(('paragraph', p._element))
|
||
# 添加到文本输出
|
||
text_output.append(content)
|
||
except Exception as e:
|
||
print(f"警告:处理段落或表格时出错: {str(e)}")
|
||
continue
|
||
|
||
# 按顺序将所有元素插入文档
|
||
for element_type, element in elements_to_insert:
|
||
doc._body._element.append(element)
|
||
|
||
# 如果有附录,添加分隔符和附录内容
|
||
if appendix:
|
||
print("\n处理附录内容...")
|
||
try:
|
||
# 添加分页符
|
||
doc.add_page_break()
|
||
|
||
# 添加附录标题
|
||
title = doc.add_paragraph("附录")
|
||
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
|
||
|
||
# 添加到文本输出
|
||
text_output.append("附录")
|
||
|
||
# 添加附录内容
|
||
appendix_elements = []
|
||
for content in appendix:
|
||
# 检查是否是表格占位符
|
||
table_match = re.match(r'TABLE_PLACEHOLDER_(\d+)', content)
|
||
if table_match:
|
||
table_index = int(table_match.group(1))
|
||
print(f"正在处理附录中的表格占位符: {content} (索引: {table_index})")
|
||
if table_index < len(tables):
|
||
table = tables[table_index]
|
||
try:
|
||
# 转换表格为文本格式
|
||
table_text = self._convert_table_to_text(table)
|
||
|
||
# 添加表格标题
|
||
title = doc.add_paragraph(f"附录表格 {table_index + 1}:")
|
||
title.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
|
||
appendix_elements.append(('paragraph', title._element))
|
||
|
||
# 添加表格文本内容,使用等宽字体
|
||
p = doc.add_paragraph()
|
||
run = p.add_run(table_text)
|
||
run.font.name = 'Courier New' # 使用等宽字体
|
||
run.font.size = Pt(10) # 设置字体大小
|
||
appendix_elements.append(('paragraph', p._element))
|
||
|
||
# 添加到文本输出
|
||
text_output.append(f"附录表格 {table_index + 1}:")
|
||
text_output.append(table_text)
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理附录表格时出错: {str(e)}")
|
||
appendix_elements.append(('paragraph', doc.add_paragraph("【表格处理失败】")._element))
|
||
text_output.append("【表格处理失败】")
|
||
else:
|
||
p = doc.add_paragraph(content)
|
||
p.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
|
||
appendix_elements.append(('paragraph', p._element))
|
||
# 添加到文本输出
|
||
text_output.append(content)
|
||
|
||
# 按顺序将附录元素插入文档
|
||
for element_type, element in appendix_elements:
|
||
doc._body._element.append(element)
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理附录时出错: {str(e)}")
|
||
|
||
# 保存docx文档
|
||
try:
|
||
doc.save(output_path)
|
||
print("\nWord文档保存成功!")
|
||
except Exception as e:
|
||
print(f"错误:保存Word文档时出错: {str(e)}")
|
||
raise
|
||
|
||
# 保存文本文件
|
||
try:
|
||
text_file_path = os.path.splitext(output_path)[0] + '.txt'
|
||
# 移除所有换行符并用空格连接
|
||
text_content = ' '.join([t.replace('\n', ' ').strip() for t in text_output if t.strip()])
|
||
with open(text_file_path, 'w', encoding='utf-8') as f:
|
||
f.write(text_content)
|
||
print(f"文本文件保存成功: {text_file_path}")
|
||
except Exception as e:
|
||
print(f"错误:保存文本文件时出错: {str(e)}")
|
||
raise
|
||
|
||
def _copy_table_fallback(self, doc: docx.Document, table: Table):
|
||
"""
|
||
表格复制的备用方法
|
||
|
||
Args:
|
||
doc: 目标文档
|
||
table: 源表格
|
||
"""
|
||
# 获取表格的行数和列数
|
||
rows = len(table.rows)
|
||
cols = len(table.columns)
|
||
|
||
# 创建新表格
|
||
new_table = doc.add_table(rows=rows, cols=cols)
|
||
|
||
# 复制表格样式
|
||
if table.style:
|
||
new_table.style = table.style
|
||
|
||
# 复制表格属性
|
||
new_table._element.tblPr = deepcopy(table._element.tblPr)
|
||
|
||
# 复制网格信息
|
||
new_table._element.tblGrid = deepcopy(table._element.tblGrid)
|
||
|
||
# 创建单元格映射以跟踪合并
|
||
cell_map = {}
|
||
|
||
# 第一遍:标记合并的单元格
|
||
for i in range(rows):
|
||
for j in range(cols):
|
||
try:
|
||
src_cell = table.cell(i, j)
|
||
# 检查是否是合并单元格的一部分
|
||
if src_cell._element.tcPr is not None:
|
||
# 检查垂直合并
|
||
vmerge = src_cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
val = vmerge[0].get(qn('w:val'), 'continue')
|
||
if val == 'restart':
|
||
# 这是合并的起始单元格
|
||
span = self._get_vertical_span(table, i, j)
|
||
cell_map[(i, j)] = ('vmerge', span)
|
||
|
||
# 检查水平合并
|
||
gridspan = src_cell._element.tcPr.xpath('.//w:gridSpan')
|
||
if gridspan:
|
||
span = int(gridspan[0].get(qn('w:val')))
|
||
if span > 1:
|
||
cell_map[(i, j)] = ('hmerge', span)
|
||
except Exception as e:
|
||
print(f"警告:处理合并单元格时出错 [{i},{j}]: {str(e)}")
|
||
|
||
# 第二遍:复制内容并执行合并
|
||
for i in range(rows):
|
||
for j in range(cols):
|
||
try:
|
||
src_cell = table.cell(i, j)
|
||
dst_cell = new_table.cell(i, j)
|
||
|
||
# 检查是否需要合并
|
||
if (i, j) in cell_map:
|
||
merge_type, span = cell_map[(i, j)]
|
||
if merge_type == 'vmerge':
|
||
# 垂直合并
|
||
for k in range(1, span):
|
||
if i + k < rows:
|
||
dst_cell.merge(new_table.cell(i + k, j))
|
||
elif merge_type == 'hmerge':
|
||
# 水平合并
|
||
for k in range(1, span):
|
||
if j + k < cols:
|
||
dst_cell.merge(new_table.cell(i, j + k))
|
||
|
||
# 复制单元格属性
|
||
if src_cell._element.tcPr is not None:
|
||
dst_cell._element.tcPr = deepcopy(src_cell._element.tcPr)
|
||
|
||
# 复制单元格内容
|
||
dst_cell.text = "" # 清除默认内容
|
||
for src_paragraph in src_cell.paragraphs:
|
||
dst_paragraph = dst_cell.add_paragraph()
|
||
# 复制段落属性
|
||
if src_paragraph._element.pPr is not None:
|
||
dst_paragraph._element.pPr = deepcopy(src_paragraph._element.pPr)
|
||
|
||
# 复制文本和格式
|
||
for src_run in src_paragraph.runs:
|
||
dst_run = dst_paragraph.add_run(src_run.text)
|
||
# 复制运行属性
|
||
if src_run._element.rPr is not None:
|
||
dst_run._element.rPr = deepcopy(src_run._element.rPr)
|
||
|
||
except Exception as e:
|
||
print(f"警告:复制单元格时出错 [{i},{j}]: {str(e)}")
|
||
continue
|
||
|
||
def _get_vmerge_value(self, cell_element) -> str:
|
||
"""
|
||
获取单元格的垂直合并属性
|
||
|
||
Args:
|
||
cell_element: 单元格元素
|
||
|
||
Returns:
|
||
str: 垂直合并属性值
|
||
"""
|
||
vmerge = cell_element.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
return vmerge[0].get(qn('w:val'), 'continue')
|
||
return None
|
||
|
||
def _get_gridspan_value(self, cell_element) -> int:
|
||
"""
|
||
获取单元格的水平合并数量
|
||
|
||
Args:
|
||
cell_element: 单元格元素
|
||
|
||
Returns:
|
||
int: 水平合并的列数
|
||
"""
|
||
try:
|
||
gridspan = cell_element.xpath('.//w:gridSpan')
|
||
if gridspan and gridspan[0].get(qn('w:val')):
|
||
return int(gridspan[0].get(qn('w:val')))
|
||
except (ValueError, TypeError, AttributeError) as e:
|
||
print(f"警告:获取gridspan值时出错: {str(e)}")
|
||
return 1 # 默认返回1,表示没有合并
|
||
|
||
def _get_vertical_span(self, table: Table, start_row: int, col: int) -> int:
|
||
"""
|
||
计算垂直合并的行数
|
||
|
||
Args:
|
||
table: 表格对象
|
||
start_row: 起始行
|
||
col: 列号
|
||
|
||
Returns:
|
||
int: 垂直合并的行数
|
||
"""
|
||
span = 1
|
||
for i in range(start_row + 1, len(table.rows)):
|
||
cell = table.cell(i, col)
|
||
if self._get_vmerge_value(cell._element) == 'continue':
|
||
span += 1
|
||
else:
|
||
break
|
||
return span
|
||
|
||
def _convert_table_to_text(self, table: Table) -> str:
|
||
"""
|
||
将表格转换为文本格式,智能处理简单和复杂表格结构
|
||
|
||
Args:
|
||
table: docx表格对象
|
||
|
||
Returns:
|
||
str: 表格的文本表示
|
||
"""
|
||
try:
|
||
# 获取表格的行数和列数
|
||
rows = len(table.rows)
|
||
cols = len(table.columns)
|
||
|
||
if rows == 0 or cols == 0:
|
||
return "【空表格】"
|
||
|
||
# 存储处理后的表格数据
|
||
processed_data = []
|
||
|
||
# 检查是否是复杂表格(具有合并单元格或多级表头)
|
||
is_complex_table = False
|
||
max_header_rows = min(3, rows) # 最多检查前3行
|
||
|
||
# 检查前几行是否存在合并单元格
|
||
for i in range(max_header_rows):
|
||
for j in range(cols):
|
||
try:
|
||
cell = table.cell(i, j)
|
||
if cell._element.tcPr is not None:
|
||
# 检查垂直合并
|
||
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
is_complex_table = True
|
||
break
|
||
# 检查水平合并
|
||
gridspan = cell._element.tcPr.xpath('.//w:gridSpan')
|
||
if gridspan:
|
||
is_complex_table = True
|
||
break
|
||
except Exception:
|
||
continue
|
||
if is_complex_table:
|
||
break
|
||
|
||
if is_complex_table:
|
||
# 使用复杂表格处理逻辑
|
||
# 第一步:分析表头结构
|
||
header_structure = [] # 存储表头的层级结构
|
||
|
||
# 分析每一列的表头结构
|
||
for j in range(cols):
|
||
column_headers = []
|
||
last_header = None
|
||
for i in range(max_header_rows):
|
||
try:
|
||
cell = table.cell(i, j)
|
||
text = cell.text.strip()
|
||
|
||
# 检查垂直合并
|
||
if cell._element.tcPr is not None:
|
||
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge:
|
||
val = vmerge[0].get(qn('w:val'), 'continue')
|
||
if val == 'continue':
|
||
# 使用上一个非空表头
|
||
if last_header:
|
||
text = last_header
|
||
|
||
# 检查水平合并
|
||
if cell._element.tcPr is not None:
|
||
gridspan = self._get_gridspan_value(cell._element)
|
||
if gridspan > 1:
|
||
# 标记这是一个跨列的表头
|
||
text = f"SPAN_{gridspan}_{text}"
|
||
|
||
if text:
|
||
column_headers.append(text)
|
||
last_header = text
|
||
|
||
except Exception as e:
|
||
print(f"警告:分析表头单元格 [{i},{j}] 时出错: {str(e)}")
|
||
continue
|
||
|
||
header_structure.append(column_headers)
|
||
|
||
# 第二步:构建完整的表头标识符
|
||
full_headers = []
|
||
for j, headers in enumerate(header_structure):
|
||
if not headers:
|
||
full_headers.append(f"列{j + 1}")
|
||
continue
|
||
|
||
# 处理跨列的表头
|
||
header_text = []
|
||
current_prefix = ""
|
||
for h in headers:
|
||
if h.startswith('SPAN_'):
|
||
parts = h.split('_', 2)
|
||
span = int(parts[1])
|
||
text = parts[2]
|
||
# 将跨列的表头添加到后续的列
|
||
for k in range(span):
|
||
if j + k < cols:
|
||
if k == 0:
|
||
if text != current_prefix: # 避免重复前缀
|
||
header_text.append(text)
|
||
current_prefix = text
|
||
else:
|
||
if text not in header_structure[j + k]:
|
||
header_structure[j + k].insert(0, text)
|
||
else:
|
||
if h != current_prefix: # 避免重复前缀
|
||
header_text.append(h)
|
||
current_prefix = h
|
||
|
||
# 移除重复的表头部分
|
||
unique_headers = []
|
||
seen = set()
|
||
for h in header_text:
|
||
if h not in seen:
|
||
unique_headers.append(h)
|
||
seen.add(h)
|
||
|
||
full_headers.append('_'.join(unique_headers))
|
||
|
||
# 确定实际的表头行数
|
||
header_row_count = max(len(headers) for headers in header_structure)
|
||
if header_row_count == 0:
|
||
header_row_count = 1
|
||
|
||
# 处理数据行
|
||
for i in range(header_row_count, rows):
|
||
try:
|
||
row_data = []
|
||
j = 0
|
||
while j < cols:
|
||
try:
|
||
cell = table.cell(i, j)
|
||
text = cell.text.strip()
|
||
|
||
# 处理垂直合并
|
||
if not text and cell._element.tcPr is not None:
|
||
vmerge = cell._element.tcPr.xpath('.//w:vMerge')
|
||
if vmerge and vmerge[0].get(qn('w:val')) == 'continue':
|
||
# 使用上一行的值
|
||
text = table.cell(i - 1, j).text.strip()
|
||
|
||
# 处理水平合并
|
||
gridspan = self._get_gridspan_value(cell._element)
|
||
|
||
# 将值复制到所有合并的列
|
||
for k in range(gridspan):
|
||
if j + k < len(full_headers):
|
||
row_data.append(f"{full_headers[j + k]}:{text}")
|
||
|
||
j += gridspan
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
|
||
if j < len(full_headers):
|
||
row_data.append(f"{full_headers[j]}:")
|
||
j += 1
|
||
|
||
# 确保行中至少有一个非空值
|
||
if any(data.split(':')[1].strip() for data in row_data):
|
||
processed_data.append(" ".join(row_data))
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
|
||
continue
|
||
|
||
else:
|
||
# 使用简单表格处理逻辑
|
||
# 获取表头
|
||
headers = []
|
||
for j in range(cols):
|
||
try:
|
||
header_text = table.cell(0, j).text.strip()
|
||
if not header_text: # 如果表头为空,使用默认值
|
||
header_text = f"列{j + 1}"
|
||
headers.append(header_text)
|
||
except Exception as e:
|
||
print(f"警告:处理表头单元格 [0,{j}] 时出错: {str(e)}")
|
||
headers.append(f"列{j + 1}")
|
||
|
||
# 处理数据行
|
||
for i in range(1, rows):
|
||
try:
|
||
row_data = []
|
||
for j in range(cols):
|
||
try:
|
||
text = table.cell(i, j).text.strip()
|
||
row_data.append(f"{headers[j]}:{text}")
|
||
except Exception as e:
|
||
print(f"警告:处理数据单元格 [{i},{j}] 时出错: {str(e)}")
|
||
row_data.append(f"{headers[j]}:")
|
||
|
||
# 确保行中至少有一个非空值
|
||
if any(data.split(':')[1].strip() for data in row_data):
|
||
processed_data.append(" ".join(row_data))
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理数据行 {i} 时出错: {str(e)}")
|
||
continue
|
||
|
||
# 返回处理后的表格文本
|
||
if processed_data:
|
||
return " ".join(processed_data)
|
||
else:
|
||
return "【表格无有效数据】"
|
||
|
||
except Exception as e:
|
||
print(f"警告:处理表格时出错: {str(e)}")
|
||
return "【表格处理失败】"
|
||
|
||
def _extract_table_text(self, table: Table) -> str:
|
||
"""
|
||
提取表格中的文本内容,现在会返回格式化的文本表示
|
||
|
||
Args:
|
||
table: docx表格对象
|
||
|
||
Returns:
|
||
str: 表格内容的文本表示
|
||
"""
|
||
return self._convert_table_to_text(table)
|
||
|
||
|
||
def process_file(byte_array: bytes, suffix: str = 'docx') -> Tuple[bytes, str]:
|
||
"""
|
||
处理文件的二进制数据
|
||
|
||
Args:
|
||
byte_array: 文件的二进制数据
|
||
suffix: 文件后缀名(不含点号,例如'doc'或'docx')
|
||
|
||
Returns:
|
||
Tuple[bytes, str]: (docx文件字节流, 文本内容)
|
||
"""
|
||
try:
|
||
# 确保后缀名格式正确(添加点号)
|
||
suffix = suffix.lower().strip()
|
||
if not suffix.startswith('.'):
|
||
suffix = '.' + suffix
|
||
|
||
# 创建临时文件
|
||
temp_dir = tempfile.mkdtemp()
|
||
temp_file = os.path.join(temp_dir, f'temp{suffix}')
|
||
|
||
# 保存二进制数据到临时文件
|
||
with open(temp_file, 'wb') as f:
|
||
f.write(byte_array)
|
||
|
||
# 检查文件大小
|
||
file_size = len(byte_array)
|
||
if file_size > 50 * 1024 * 1024: # 50MB
|
||
raise Exception("文件大小超过50MB限制")
|
||
|
||
# 检查文件格式
|
||
if suffix.lower() not in ['.doc', '.docx']:
|
||
raise Exception("不支持的文件格式,仅支持.doc和.docx格式")
|
||
|
||
# 检查文件头部特征
|
||
file_type = None
|
||
if len(byte_array) >= 8:
|
||
# DOCX文件特征 (ZIP格式,以PK\x03\x04开头)
|
||
if byte_array.startswith(b'PK\x03\x04'):
|
||
file_type = 'docx'
|
||
logging.info("检测到DOCX文件格式")
|
||
# DOC文件特征 (复合文件二进制格式,以D0CF11E0开头)
|
||
elif byte_array.startswith(b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'):
|
||
file_type = 'doc'
|
||
logging.info("检测到DOC文件格式")
|
||
|
||
# 如果无法通过文件头识别,尝试通过内容特征识别
|
||
if not file_type and len(byte_array) >= 512:
|
||
content_start = byte_array[:512]
|
||
# 检查是否包含Word文档的特征字符串
|
||
if (b'Microsoft Word' in content_start or
|
||
b'word/document.xml' in content_start or
|
||
b'Word.Document' in content_start):
|
||
file_type = 'unknown_word'
|
||
logging.info("通过内容特征检测到Word文档")
|
||
else:
|
||
# 尝试读取文件内容
|
||
try:
|
||
with open(temp_file, 'rb') as f:
|
||
# 尝试以ZIP格式打开(DOCX格式)
|
||
try:
|
||
import zipfile
|
||
with zipfile.ZipFile(f) as zf:
|
||
if any(name.startswith('word/') for name in zf.namelist()):
|
||
file_type = 'docx'
|
||
logging.info("通过ZIP结构检测到DOCX文件")
|
||
except zipfile.BadZipFile:
|
||
# 不是有效的ZIP文件,尝试其他检测方法
|
||
pass
|
||
except Exception as e:
|
||
logging.warning(f"文件内容检测失败: {str(e)}")
|
||
|
||
if not file_type:
|
||
raise Exception("无法识别的Word文档格式")
|
||
|
||
# 检查文件后缀是否与实际格式匹配
|
||
if file_type == 'docx' and suffix.lower() != '.docx':
|
||
logging.warning("文件实际格式为DOCX,但后缀为%s", suffix)
|
||
elif file_type == 'doc' and suffix.lower() != '.doc':
|
||
logging.warning("文件实际格式为DOC,但后缀为%s", suffix)
|
||
|
||
# 如果是doc格式,先转换为docx
|
||
input_file = temp_file
|
||
if file_type == 'doc' or (file_type == 'unknown_word' and suffix.lower() == '.doc'):
|
||
try:
|
||
input_file = DocCleaner()._convert_doc_to_docx(temp_file)
|
||
logging.info("DOC文件已成功转换为DOCX格式")
|
||
except Exception as e:
|
||
raise Exception(f"转换doc文件失败: {str(e)}")
|
||
|
||
cleaner = DocCleaner()
|
||
|
||
# 清理文档
|
||
main_content, appendix, tables = cleaner.clean_doc(input_file)
|
||
|
||
# 创建临时文件用于保存处理结果
|
||
output_docx = os.path.join(temp_dir, 'output.docx')
|
||
|
||
# 保存为docx格式
|
||
cleaner.save_as_docx(main_content, appendix, tables, output_docx)
|
||
|
||
# 读取docx文件内容
|
||
with open(output_docx, 'rb') as f:
|
||
docx_bytes = f.read()
|
||
|
||
# 读取文本内容
|
||
text_file = os.path.splitext(output_docx)[0] + '.txt'
|
||
with open(text_file, 'r', encoding='utf-8') as f:
|
||
text_content = f.read()
|
||
|
||
# 清理临时文件
|
||
os.remove(temp_file)
|
||
if input_file != temp_file:
|
||
try:
|
||
os.remove(input_file)
|
||
except:
|
||
pass
|
||
os.remove(output_docx)
|
||
os.remove(text_file)
|
||
os.rmdir(temp_dir)
|
||
|
||
return docx_bytes, text_content
|
||
|
||
except Exception as e:
|
||
logging.error(f"处理文件失败: {str(e)}")
|
||
raise Exception(f"处理文件失败: {str(e)}")
|
||
|
||
|
||
def process_directory(input_dir: str, output_dir: str = None):
|
||
"""
|
||
处理指定目录下的所有文档文件
|
||
|
||
Args:
|
||
input_dir: 输入目录路径
|
||
output_dir: 输出目录路径,如果为None则使用输入目录
|
||
"""
|
||
# 如果未指定输出目录,使用输入目录
|
||
if output_dir is None:
|
||
output_dir = input_dir
|
||
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
cleaner = DocCleaner()
|
||
|
||
for root, _, files in os.walk(input_dir):
|
||
for file in files:
|
||
if file.endswith(('.doc', '.docx')):
|
||
input_path = os.path.join(root, file)
|
||
|
||
try:
|
||
# 清理文档
|
||
main_content, appendix, tables = cleaner.clean_doc(input_path)
|
||
|
||
# 创建输出文件名(统一使用docx扩展名)
|
||
base_name = os.path.splitext(file)[0]
|
||
output_path = os.path.join(output_dir, f"{base_name}_cleaned.docx")
|
||
|
||
# 保存为docx格式
|
||
cleaner.save_as_docx(main_content, appendix, tables, output_path)
|
||
|
||
except Exception as e:
|
||
print(f"处理文件 {file} 时出错: {str(e)}")
|
||
# 添加更详细的错误信息
|
||
if isinstance(e, subprocess.CalledProcessError):
|
||
print(f"命令执行错误: {e.output}")
|
||
elif isinstance(e, FileNotFoundError):
|
||
print("请确保已安装LibreOffice并将其添加到系统PATH中")
|
||
|
||
|
||
def qn(tag: str) -> str:
|
||
"""
|
||
将标签转换为带命名空间的格式
|
||
|
||
Args:
|
||
tag: 原始标签
|
||
|
||
Returns:
|
||
str: 带命名空间的标签
|
||
"""
|
||
prefix = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
|
||
return prefix + tag
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
import sys
|
||
import json
|
||
import base64
|
||
|
||
parser = argparse.ArgumentParser(description='文档清理工具')
|
||
group = parser.add_mutually_exclusive_group(required=True)
|
||
group.add_argument('--file', help='输入文件路径')
|
||
group.add_argument('--stdin', action='store_true', help='从标准输入读取Base64编码的文件二进制数据')
|
||
group.add_argument('--dir', help='输入目录路径')
|
||
parser.add_argument('--suffix', help='文件后缀名(不含点号,例如doc或docx)', default='docx')
|
||
parser.add_argument('--output_dir', help='输出目录路径', required=True)
|
||
|
||
args = parser.parse_args()
|
||
|
||
try:
|
||
# 确保输出目录存在
|
||
os.makedirs(args.output_dir, exist_ok=True)
|
||
|
||
result = {
|
||
'status': 'success',
|
||
'message': '',
|
||
'docxPath': '',
|
||
'txtPath': ''
|
||
}
|
||
|
||
if args.stdin:
|
||
# 从标准输入读取Base64数据
|
||
try:
|
||
# 读取所有输入数据
|
||
base64_data = sys.stdin.read().strip()
|
||
|
||
# 解码Base64数据
|
||
byte_array = base64.b64decode(base64_data)
|
||
|
||
# 生成输出文件路径
|
||
output_docx = os.path.join(args.output_dir, f"output{args.suffix}")
|
||
output_txt = os.path.join(args.output_dir, "output.txt")
|
||
|
||
# 处理文件
|
||
docx_bytes, text_content = process_file(byte_array, args.suffix)
|
||
|
||
# 保存文件
|
||
with open(output_docx, 'wb') as f:
|
||
f.write(docx_bytes)
|
||
with open(output_txt, 'w', encoding='utf-8') as f:
|
||
f.write(text_content)
|
||
|
||
result['docxPath'] = output_docx
|
||
result['txtPath'] = output_txt
|
||
result['message'] = 'success'
|
||
logging.info(f"二进制数据处理成功")
|
||
|
||
except Exception as e:
|
||
result['status'] = 'error'
|
||
result['message'] = str(e)
|
||
logging.error(f"处理二进制数据失败: {str(e)}")
|
||
|
||
elif args.file:
|
||
# 处理单个文件
|
||
input_path = args.file
|
||
|
||
try:
|
||
# 读取文件内容
|
||
with open(input_path, 'rb') as f:
|
||
byte_array = f.read()
|
||
|
||
# 获取文件后缀
|
||
_, suffix = os.path.splitext(input_path)
|
||
|
||
# 生成输出文件路径
|
||
base_name = os.path.splitext(os.path.basename(input_path))[0]
|
||
output_docx = os.path.join(args.output_dir, f"{base_name}_cleaned.docx")
|
||
output_txt = os.path.join(args.output_dir, f"{base_name}_cleaned.txt")
|
||
|
||
# 处理文件
|
||
docx_bytes, text_content = process_file(byte_array, suffix)
|
||
|
||
# 保存文件
|
||
with open(output_docx, 'wb') as f:
|
||
f.write(docx_bytes)
|
||
with open(output_txt, 'w', encoding='utf-8') as f:
|
||
f.write(text_content)
|
||
|
||
result['docxPath'] = output_docx
|
||
result['txtPath'] = output_txt
|
||
result['message'] = 'success'
|
||
logging.info(f"文件处理成功: {input_path}")
|
||
|
||
except Exception as e:
|
||
result['status'] = 'error'
|
||
result['message'] = str(e)
|
||
logging.error(f"处理文件失败: {str(e)}")
|
||
|
||
else:
|
||
# 处理目录
|
||
try:
|
||
process_directory(args.dir, args.output_dir)
|
||
result['message'] = 'success'
|
||
logging.info(f"目录处理完成: {args.dir} -> {args.output_dir}")
|
||
except Exception as e:
|
||
result['status'] = 'error'
|
||
result['message'] = str(e)
|
||
logging.error(f"处理目录失败: {str(e)}")
|
||
|
||
# 只输出JSON格式的结果
|
||
print(json.dumps(result, ensure_ascii=False))
|
||
sys.exit(0 if result['status'] == 'success' else 1)
|
||
|
||
except Exception as e:
|
||
error_result = {
|
||
'status': 'error',
|
||
'message': str(e),
|
||
'docxPath': '',
|
||
'txtPath': ''
|
||
}
|
||
logging.error(f"程序执行错误: {str(e)}")
|
||
print(json.dumps(error_result, ensure_ascii=False))
|
||
sys.exit(1) |