Compare commits

...

2 Commits

Author SHA1 Message Date
cxs
44050b2391 文档清洗系统脚本修改 2025-05-20 13:47:56 +08:00
cxs
cc14fcd1ed 文档清洗系统脚本修改 2025-05-20 13:47:17 +08:00
16 changed files with 3549 additions and 4512 deletions

379
README.md
View File

@@ -48,6 +48,108 @@ pip install -r requirements.txt
## 最近更新
### 2024年6月2日
- 改进Markdown表格转换功能
- 修复了合并单元格内容重复显示的问题
- 修复了空单元格自动填充相邻值的问题
- 优化表格处理逻辑,正确处理多级表头和复杂表格结构
- 确保合并单元格的内容仅在原始单元格位置显示,被合并的单元格保持为空
### 2024年6月1日
- **修复服务端文件处理过程中的类型错误**
- 解决文件处理后删除临时文件时出现的`'dict' object has no attribute 'strip'`错误
- 增强了`main.py`中处理文本内容时的类型安全性,确保只对字符串类型调用字符串方法
- 添加了对处理文件时`all_content`合并时的严格类型检查,实现对混合内容类型(字符串、字典等)的安全处理
- 优化了临时文件和目录清理过程,增加类型检查以避免对非字符串类型执行路径操作
- 改进临时文件删除的安全性,使用`safe_delete_file`函数确保字符串类型参数
- 提高了文件处理后清理逻辑的健壮性,避免因类型不匹配导致的处理中断
- 完善错误处理机制,提供更详细的日志信息,便于快速定位问题
### 2024年5月31日
- **修复表格文本提取过程中的类型错误**
- 解决了在无效表格转换为文本时出现的`'dict' object has no attribute 'strip'`错误
- 增强了`save_as_docx`方法中对`_extract_table_text`返回值的类型安全检查
- 确保只有在返回值是字符串类型时才调用`strip()`方法
- 优化了表格文本提取的错误处理流程,提高系统处理复杂表格的稳健性
- 完善了不同表格类型的处理逻辑,确保各种特殊结构表格都能被正确处理
- 修复了在表格无效时可能触发的类型错误,保证表格处理流程不会因类型不匹配而中断
### 2024年5月30日
- **修复文本清理过程中的类型错误**
- 解决了在文本清理过程中出现的`'dict' object has no attribute 'strip'`错误
- 增强了`_clean_text`方法中的类型检查,确保只对字符串类型对象调用字符串方法
- 改进了文本文件生成逻辑,添加针对不同数据类型的处理策略
- 优化了元数据字典的内容提取方式,确保其正文内容能够被包含在文本输出中
- 提高了代码的健壮性,可以正确处理混合类型的内容列表
- 完善了错误处理机制,避免因类型不匹配导致的处理中断
### 2024年5月29日
- **修复文档去重处理异常**
- 解决了处理带元数据的文档时出现的`'dict' object has no attribute 'startswith'`错误
- 增强了_remove_duplicates方法使其能够正确处理字典类型的元数据结构
- 改进了内部类型检查,区分处理字符串、字典、元组和列表等不同数据类型
- 优化了去重机制,保证包含元数据的文档正常处理不会中断
- 更新了相关方法的类型提示,明确方法可以处理任何类型的元素
- 完善对特殊数据类型的保留逻辑,确保元数据不会在去重过程中丢失
### 2024年5月28日
- **增强复杂表格结构处理能力**
- 实现多级表头表格的自动识别与处理支持最多4级嵌套表头
- 完善对复杂表头结构的分析,正确处理跨行跨列的表头单元格
- 针对多级表头表格优化Markdown输出格式保持原始表格的层次结构
- 改进表格预处理流程,添加表格结构类型自动检测功能
- 优化合并单元格处理逻辑,确保复杂表格数据的完整性和准确性
- 引入表格元数据记录机制,保存表格结构特征以便后续处理
- 完善复杂表格的调试日志,便于排查处理过程中的问题
### 2024年5月27日
- **修复表格处理异常错误**
- 解决了处理特殊表格时出现的`'dict' object has no attribute 'strip'`类型错误
- 增强了内容过滤函数的类型安全检查,避免对非字符串类型应用字符串方法
- 改进了对多种数据类型(字典、元组、列表等)的兼容性处理
- 确保在处理表格时不会因类型错误而导致文档处理中断
- 优化了错误处理机制,提供更详细的错误信息以便于调试
- 提高了代码的健壮性,能更好地处理各种格式和结构的输入数据
### 2024年5月26日
- **增强特殊结构表格处理能力**
- 优化对特殊格式表格的识别能力,特别是键值对形式的表格(如账号、订单号等信息表)
- 新增键值对表格专用渲染模式,确保所有键值对数据都被正确提取
- 改进单元格内容提取逻辑,避免遗漏特殊格式的单元格内容
- 引入表格结构智能分析,自动识别不同类型的表格并应用最合适的处理方法
- 完善对"PO-Number"、"Account No."等业务类信息字段的特殊处理
- 添加了多种关键字模式匹配,提高对各类业务表格的兼容性
- 优化文本提取过程,确保每个文本片段都能被完整保留
### 2024年5月25日
- **优化Markdown表格格式输出**
- 严格按照原文档中的表格格式输出Markdown表格保留空单元格
- 去除过度智能填充功能,避免自动填充空单元格导致的格式不一致
- 保留完整表格结构,包括所有的空行
- 只处理显式标记的合并单元格,不进行推断性的内容填充
- 确保表格的每个单元格精确反映原始文档中的内容
- 简化表格处理逻辑,提高处理速度和准确性
### 2024年5月24日
- **优化Markdown输出格式**
- 移除Markdown输出中的文档元数据信息如创建时间、作者、备注等
- 保留DOCX和TXT输出中的文档元信息不影响其完整性
- 重构元数据处理逻辑,使用类型标记区分不同内容类型
- 提高代码的可维护性和扩展性
- 优化内部数据结构,便于后续自定义输出格式
- 改进文本处理流程,统一处理文本和结构化内容
### 2024年5月23日
- **修复PDF转Word时图片丢失的问题**
- 增加了多种图片提取方法确保即使在PDF转DOCX过程中丢失图片也能正确保留
- 直接从PDF源文件提取图片不再完全依赖转换后的DOCX文档
- 改进图片处理流程确保OCR结果被正确保存到最终输出
- 增强Markdown图片处理逻辑优化图片路径管理和文件复制过程
- 添加详细日志和错误诊断信息,便于问题排查
- 优化图片文件存储结构,避免图片重复处理
- 增加图片处理失败时的备选方案,提高系统鲁棒性
- 解决Markdown文档中图片无法显示的问题
### 2024年5月21日
- **增强复杂表格处理的安全性和稳定性**
- 全面优化索引安全处理,解决表格解析中的索引越界问题
@@ -219,6 +321,16 @@ pip install -r requirements.txt
- 优化文件处理逻辑
- 改进错误处理
### 2024年6月3日
- 优化表格数据处理逻辑:
- 重构format_group_to_text函数采用更通用的处理方式
- 移除对特定字段名的依赖,提高代码灵活性
- 自动清理和格式化字段名中的特殊字符
- 统一的文本格式化逻辑,适应不同数据结构
- 改进键值对处理方式,支持更多数据格式
- 自动清理"表格无有效数据"等无效提示信息
- 优化文本拼接逻辑,确保输出格式的一致性
## 安装说明
1. 克隆项目代码
@@ -358,4 +470,269 @@ python fix_tesseract_path.py
如果您希望永久解决这个问题,可以:
1. 将Tesseract安装目录(通常是`C:\Program Files\Tesseract-OCR`)添加到系统PATH环境变量
2. 设置环境变量`TESSERACT_CMD`为Tesseract可执行文件的完整路径
2. 设置环境变量`TESSERACT_CMD`为Tesseract可执行文件的完整路径
# 文档表格处理工具
本工具提供了强大的表格数据处理功能,可以处理文档中的各种类型表格。
## 主要功能
### 1. 表格数据处理
- 表格数据验证和清理
- 表格结构规范化
- 表格数据增强
- 表格布局优化
### 2. 数据转换和分析
- 表格转换为字典格式
- 表格内容分析
- 基本统计信息
- 列数据分析
- 数据模式检测
- 结构特征分析
- 内容质量评估
- 重复值检测
- 值分布分析
- 列相关性分析
- 数据一致性检查
- 潜在问题检测
### 3. 格式转换
- 转换为CSV格式
- 转换为Excel格式
- 转换为Markdown格式
- 转换为HTML格式
### 4. 高级功能
- 多级表头处理
- 合并单元格处理
- 键值对表格处理
- 数据类型自动识别
- 表格结构优化
## 使用示例
```python
from cxs.cxs_table_processor import TableProcessor
# 创建处理器实例
processor = TableProcessor()
# 处理表格数据
table_data = {
'rows': [...],
'header_rows': 1
}
# 数据转换
dict_data = processor._convert_table_to_dict(table_data)
# 内容分析
analysis = processor._analyze_table_content(table_data)
# 检测数据模式
patterns = processor._detect_table_patterns(table_data)
# 导出为不同格式
csv_data = processor._convert_table_to_csv(table_data)
excel_data = processor._convert_table_to_excel(table_data)
```
## 数据转换能力
支持以下数据转换:
1. 表格 -> 字典
- 支持键值对表格
- 支持普通表格结构
- 支持多级表头
2. 表格 -> CSV
- 自动处理表头
- 处理合并单元格
- 支持自定义分隔符
3. 表格 -> Excel
- 保留表格结构
- 处理合并单元格
- 自动调整列宽
- 设置表头样式
4. 表格 -> Markdown/HTML
- 格式化输出
- 保持表格结构
- 支持样式设置
## 开发要求
- Python 3.7+
- 依赖包:
- openpyxl
- pandas
- numpy
## 安装说明
```bash
pip install -r requirements.txt
```
## 更新日志
### v1.0.0
- 初始版本发布
- 基本的表格处理功能
### v1.1.0
- 添加数据验证和清理功能
- 增加表格结构规范化
### v1.2.0
- 添加数据转换功能
- 支持多种输出格式
### v1.3.0
- 添加内容分析功能
- 增加数据模式检测
- 优化表格处理性能
### 2024-03-xx
- 优化了表格处理器的稳定性和错误处理
- 增加了严格的索引检查和边界处理
- 改进了单元格和行级别的错误处理机制
- 增强了对大型表格和复杂表头的支持
- 优化了表格规范化处理
- 提高了处理不规则表格的兼容性
- 改进了内存使用效率
## 注意事项
1. 大文件处理
- 建议分批处理大型表格
- 注意内存使用
2. 数据验证
- 建议在处理前进行数据验证
- 检查表格结构完整性
3. 错误处理
- 所有方法都包含错误处理
- 详细的错误信息输出
## 贡献指南
1. Fork 项目
2. 创建特性分支
3. 提交变更
4. 发起 Pull Request
## 许可证
MIT License
## 更新日志
### 2024-03-21
- 修复了文本分段工具的命令行参数问题
- 现在支持使用相对路径处理文件
- 新增txt格式输出支持默认输出格式改为txt
- 优化了段落分隔显示,使用空行分隔各段落
- 全新的智能语义分段功能:
- 支持基于章节标题的主要分段
- 支持基于语义转折词的次要分段
- 智能识别特殊段落(如摘要、引言等)
- 自动合并过短段落,保持语义完整性
- 优化的中文标点符号处理
- 添加token长度控制
- 确保每个段落不超过512个token
- 智能估算中英文混合文本的token数量
- 对超长句子进行智能拆分
- 在保持语义完整性的同时控制token数量
- 优化表格内容处理:
- 自动移除表格标记(如"表格1开始"、"表格1结束"
- 智能分割长表格内容确保每段不超过token限制
- 保持表格行的语义完整性
- 使用逗号和分号作为表格内容的分割点
- 优化表格段落的可读性
## 使用方法
### 文本分段工具
基本用法输出txt格式
```bash
python text_paragraph_splitter.py sample_continuous_text.txt
```
指定输出文件:
```bash
python text_paragraph_splitter.py sample_continuous_text.txt -o output.txt
```
输出JSON格式
```bash
python text_paragraph_splitter.py sample_continuous_text.txt -f json -o output.json
```
参数说明:
- input_file输入文件路径例如sample_continuous_text.txt
- --output/-o输出文件路径默认为当前目录下的 paragraphs.txt
- --format/-f输出格式支持txt和json默认为txt
分段规则说明:
1. Token长度控制
- 每个段落严格控制在512个token以内
- 中文字符按1.5个token计算
- 英文单词按1个token计算
- 标点符号按1个token计算
- 超长句子会按逗号智能拆分
2. 表格处理:
- 自动识别并移除表格标记
- 按行处理表格内容
- 智能合并短行不超过token限制
- 对超长行进行分割处理
- 保持表格内容的语义连贯性
3. 主要分段标记:
- 章节标题(如"第一章"、"第1节"等)
- 序号标记(如"一、"、"1."、"1"等)
- 罗马数字标记(如"I."、"II."等)
4. 次要分段标记:
- 语义转折词(如"然而"、"但是"、"因此"等)
- 总结性词语(如"总的来说"、"综上所述"等)
- 举例词语(如"例如"、"比如"等)
5. 特殊段落:
- 自动识别摘要、引言、结论等特殊段落
- 保持这些段落的独立性
6. 智能合并:
- 合并过短的段落小于50字
- 确保合并后不超过token限制
- 保持标题等特殊标记的独立性
- 确保段落语义的完整性
### 2024-03-xx
- 重写了表格处理逻辑,增强了对复杂表格的处理能力
- 使用@dataclass重新设计了表格数据结构Cell、Row、Table类
- 支持多级表头和合并单元格的识别
- 自动识别表格类型(标准表格、键值对、矩阵等)
- 支持特殊格式(货币、百分比、日期等)的识别和转换
- 增加了表格处理的错误处理和容错机制
- 优化了表格输出格式支持Markdown、HTML和字典格式
- 添加了表格处理的调试信息输出
- 更新了导入路径使用新的table_processor模块
### 2024年3月
- 优化表格处理逻辑:
- 修复复杂表头和矩阵类型表格的索引越界问题
- 增强表格行和单元格的错误处理机制
- 改进表头结构分析,支持多级表头识别
- 优化矩阵类型表格的检测算法
- 增加数字单元格识别功能
- 完善表格转文本的格式化处理
- 增加边界检查和异常处理机制
- 优化内存使用和处理效率

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

1034
cxs/doc_cleaner.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -464,7 +464,12 @@ async def process_single_file(file_path: str, cleaner: DocCleaner) -> tuple[str,
# 合并正文和附录内容用于返回
all_content = main_content + ["附录"] + appendix if appendix else main_content
text_content = " ".join([t.replace("\n", " ").strip() for t in all_content if t.strip()])
# 增加类型检查确保只对字符串类型调用strip()方法
text_content = " ".join([
t.replace("\n", " ").strip() if isinstance(t, str) else str(t)
for t in all_content
if (isinstance(t, str) and t.strip()) or not isinstance(t, str)
])
# 验证所有文件是否成功创建
if not output_file.exists():
@@ -487,10 +492,11 @@ async def process_single_file(file_path: str, cleaner: DocCleaner) -> tuple[str,
print(f"清理图片目录时出错: {str(cleanup_error)}")
try:
if temp_docx and os.path.exists(temp_docx):
# 添加类型检查确保temp_docx是字符串类型
if temp_docx and isinstance(temp_docx, (str, Path)) and os.path.exists(str(temp_docx)):
print(f"清理临时DOCX文件: {temp_docx}")
safe_delete_file(temp_docx) # 使用安全删除函
temp_dir = os.path.dirname(temp_docx)
safe_delete_file(str(temp_docx)) # 确保传递字符串参
temp_dir = os.path.dirname(str(temp_docx))
if os.path.exists(temp_dir):
try:
os.rmdir(temp_dir)

705
cxs/table_processor.py Normal file
View File

@@ -0,0 +1,705 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
import re
import json
from copy import deepcopy
@dataclass
class Cell:
"""单元格数据结构"""
text: str = "" # 单元格文本内容
row_span: int = 1 # 垂直合并行数
col_span: int = 1 # 水平合并列数
is_header: bool = False # 是否是表头单元格
data_type: str = "text" # 数据类型text, number, date, currency等
original_value: Any = None # 原始值
formatted_value: str = "" # 格式化后的值
position: Dict[str, int] = field(default_factory=lambda: {"row": 0, "col": 0}) # 单元格位置
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
@dataclass
class Row:
"""行数据结构"""
cells: List[Cell] = field(default_factory=list) # 单元格列表
is_header: bool = False # 是否是表头行
row_index: int = 0 # 行索引
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
@dataclass
class Table:
"""表格数据结构"""
rows: List[Row] = field(default_factory=list) # 行列表
header_rows: int = 0 # 表头行数
total_rows: int = 0 # 总行数
total_cols: int = 0 # 总列数
has_complex_header: bool = False # 是否有复杂表头
table_type: str = "normal" # 表格类型normal, key_value, matrix等
metadata: Dict[str, Any] = field(default_factory=dict) # 元数据
@dataclass
class TableData:
"""表格数据结构"""
rows: List[List[Dict[str, Any]]] = field(default_factory=list) # 存储表格行数据
style: Optional[str] = None # 表格样式
columns: List[Dict[str, Any]] = field(default_factory=list) # 列属性
has_multi_level_header: bool = False # 是否有多级表头
has_key_value_pairs: bool = False # 是否包含键值对结构
header_rows: int = 1 # 表头行数默认为1
table_type: str = "normal" # 表格类型normal, key_value, matrix等
def add_row(self, row_data: List[Dict[str, Any]]):
"""添加一行数据到表格"""
self.rows.append(row_data)
def get_row_count(self) -> int:
"""获取表格行数"""
return len(self.rows)
def get_column_count(self) -> int:
"""获取表格列数"""
return len(self.columns) if self.columns else (
max((len(row) for row in self.rows), default=0)
)
def is_empty(self) -> bool:
"""检查表格是否为空"""
return len(self.rows) == 0
def get_cell_text(self, row_idx: int, col_idx: int) -> str:
"""获取单元格文本内容"""
try:
if 0 <= row_idx < len(self.rows) and 0 <= col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
return cell.get('text', '').strip()
except Exception as e:
print(f"获取单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}")
return ''
def set_cell_text(self, row_idx: int, col_idx: int, text: str):
"""设置单元格文本内容"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
self.rows[row_idx][col_idx]['text'] = text
except Exception as e:
print(f"设置单元格文本时出错 [{row_idx},{col_idx}]: {str(e)}")
def get_cell_merge_info(self, row_idx: int, col_idx: int) -> Dict[str, Any]:
"""获取单元格合并信息"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
return {
'gridspan': cell.get('gridspan', 1),
'vmerge': cell.get('vmerge', None)
}
except Exception as e:
print(f"获取单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}")
return {'gridspan': 1, 'vmerge': None}
def set_cell_merge_info(self, row_idx: int, col_idx: int, gridspan: int = 1, vmerge: Optional[str] = None):
"""设置单元格合并信息"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
cell['gridspan'] = gridspan
if vmerge is not None:
cell['vmerge'] = vmerge
except Exception as e:
print(f"设置单元格合并信息时出错 [{row_idx},{col_idx}]: {str(e)}")
def is_merged_cell(self, row_idx: int, col_idx: int) -> bool:
"""检查单元格是否是合并单元格"""
try:
if row_idx < len(self.rows) and col_idx < len(self.rows[row_idx]):
cell = self.rows[row_idx][col_idx]
return cell.get('gridspan', 1) > 1 or cell.get('vmerge') is not None
except Exception as e:
print(f"检查单元格合并状态时出错 [{row_idx},{col_idx}]: {str(e)}")
return False
def get_header_rows(self) -> List[List[Dict[str, Any]]]:
"""获取表头行数据"""
return self.rows[:self.header_rows]
def get_data_rows(self) -> List[List[Dict[str, Any]]]:
"""获取数据行数据"""
return self.rows[self.header_rows:]
class TableProcessor:
"""增强的表格处理器"""
def __init__(self):
# 数据类型识别模式
self.patterns = {
'currency': r'^\s*¥?\s*\d+(\.\d{2})?\s*$', # 货币金额
'percentage': r'^\s*\d+(\.\d+)?%\s*$', # 百分比
'date': r'^\d{4}[-/年]\d{1,2}[-/月]\d{1,2}日?$', # 日期
'number': r'^\s*\d+(\.\d+)?\s*$', # 数字
'time': r'^\d{1,2}:\d{2}(:\d{2})?$' # 时间
}
# 表头关键词
self.header_keywords = [
'序号', '编号', '项目', '名称', '类型', '说明', '备注',
'金额', '时间', '日期', '地区', '部门', '人员'
]
def process_table(self, raw_table: Any) -> Table:
"""处理表格,返回标准化的表格对象"""
try:
# 1. 初始化表格对象
table = Table()
# 2. 分析表格结构
self._analyze_table_structure(raw_table, table)
# 3. 处理表头
self._process_headers(raw_table, table)
# 4. 处理数据行
self._process_data_rows(raw_table, table)
# 5. 规范化表格
self._normalize_table(table)
# 6. 识别表格类型
self._identify_table_type(table)
return table
except Exception as e:
print(f"处理表格时出错: {str(e)}")
return Table()
def _analyze_table_structure(self, raw_table: Any, table: Table):
"""分析表格结构,包括行数、列数、合并单元格等"""
try:
# 获取基本维度信息
rows = raw_table.rows
table.total_rows = len(rows)
table.total_cols = len(raw_table.columns)
# 分析表头结构
header_info = self._analyze_header_structure(raw_table)
table.header_rows = header_info['header_rows']
table.has_complex_header = header_info['is_complex']
# 记录结构信息到元数据
table.metadata['structure_info'] = {
'total_rows': table.total_rows,
'total_cols': table.total_cols,
'header_rows': table.header_rows,
'has_complex_header': table.has_complex_header,
'analyzed_at': datetime.now().isoformat()
}
except Exception as e:
print(f"分析表格结构时出错: {str(e)}")
def _analyze_header_structure(self, raw_table: Any) -> Dict[str, Any]:
"""分析表头结构,返回表头信息"""
header_info = {
'header_rows': 1,
'is_complex': False
}
try:
# 检查前三行
for i in range(min(3, len(raw_table.rows))):
row = raw_table.rows[i]
# 检查是否有合并单元格
has_merged_cells = any(
cell._element.find('.//{*}vMerge') is not None or
cell._element.find('.//{*}gridSpan') is not None
for cell in row.cells
)
# 检查是否包含表头关键词
has_header_keywords = any(
any(keyword in cell.text for keyword in self.header_keywords)
for cell in row.cells
)
if has_merged_cells or has_header_keywords:
header_info['header_rows'] = max(header_info['header_rows'], i + 1)
if has_merged_cells:
header_info['is_complex'] = True
# 检查单元格格式是否符合表头特征
cell_formats = [self._analyze_cell_format(cell) for cell in row.cells]
if any(fmt == 'header' for fmt in cell_formats):
header_info['header_rows'] = max(header_info['header_rows'], i + 1)
except Exception as e:
print(f"分析表头结构时出错: {str(e)}")
return header_info
def _analyze_cell_format(self, cell: Any) -> str:
"""分析单元格格式特征"""
try:
# 获取单元格文本
text = cell.text.strip()
# 检查是否是表头格式
if text and any(char.isupper() for char in text): # 包含大写字母
return 'header'
if text and any(keyword in text for keyword in self.header_keywords):
return 'header'
# 检查数据类型
for data_type, pattern in self.patterns.items():
if re.match(pattern, text):
return data_type
return 'text'
except Exception as e:
print(f"分析单元格格式时出错: {str(e)}")
return 'text'
def _process_headers(self, raw_table: Any, table: Table):
"""处理表头,包括多级表头的处理"""
try:
for i in range(min(table.header_rows, len(raw_table.rows))):
try:
row = raw_table.rows[i]
header_row = Row(is_header=True, row_index=i)
# 处理每个表头单元格
col_index = 0
max_cols = len(row.cells) # 获取实际的列数
for cell_idx in range(max_cols):
try:
cell = row.cells[cell_idx]
header_cell = self._process_header_cell(cell, i, col_index)
header_row.cells.append(header_cell)
col_index += header_cell.col_span
except Exception as cell_error:
print(f"处理表头单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}")
# 添加一个空单元格
header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index})
header_row.cells.append(header_cell)
col_index += 1
# 如果单元格数量不足,补充空单元格
while len(header_row.cells) < table.total_cols:
header_cell = Cell(text="", is_header=True, position={'row': i, 'col': col_index})
header_row.cells.append(header_cell)
col_index += 1
table.rows.append(header_row)
except Exception as row_error:
print(f"处理表头行时出错 [行={i}]: {str(row_error)}")
# 创建一个空行
empty_row = Row(is_header=True, row_index=i)
for col in range(table.total_cols):
empty_row.cells.append(Cell(text="", is_header=True, position={'row': i, 'col': col}))
table.rows.append(empty_row)
except Exception as e:
print(f"处理表头时出错: {str(e)}")
def _process_header_cell(self, cell: Any, row_index: int, col_index: int) -> Cell:
"""处理表头单元格"""
try:
# 创建表头单元格
header_cell = Cell(
text=cell.text.strip(),
is_header=True,
position={'row': row_index, 'col': col_index}
)
# 处理合并单元格
vmerge = cell._element.find('.//{*}vMerge')
gridspan = cell._element.find('.//{*}gridSpan')
if vmerge is not None:
val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue')
header_cell.row_span = 2 if val == 'restart' else 1
if gridspan is not None:
try:
header_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1'))
except ValueError:
header_cell.col_span = 1
return header_cell
except Exception as e:
print(f"处理表头单元格时出错: {str(e)}")
return Cell(text="", is_header=True, position={'row': row_index, 'col': col_index})
def _process_data_rows(self, raw_table: Any, table: Table):
"""处理数据行"""
try:
for i in range(table.header_rows, table.total_rows):
try:
row = raw_table.rows[i]
data_row = Row(is_header=False, row_index=i)
# 处理每个数据单元格
col_index = 0
max_cols = len(row.cells) # 获取实际的列数
for cell_idx in range(max_cols):
try:
cell = row.cells[cell_idx]
data_cell = self._process_data_cell(cell, i, col_index)
data_row.cells.append(data_cell)
col_index += data_cell.col_span
except Exception as cell_error:
print(f"处理单元格时出错 [行={i}, 列={cell_idx}]: {str(cell_error)}")
# 添加一个空单元格
data_cell = Cell(text="", position={'row': i, 'col': col_index})
data_row.cells.append(data_cell)
col_index += 1
# 如果单元格数量不足,补充空单元格
while len(data_row.cells) < table.total_cols:
data_cell = Cell(text="", position={'row': i, 'col': col_index})
data_row.cells.append(data_cell)
col_index += 1
table.rows.append(data_row)
except Exception as row_error:
print(f"处理数据行时出错 [行={i}]: {str(row_error)}")
# 创建一个空行
empty_row = Row(is_header=False, row_index=i)
for col in range(table.total_cols):
empty_row.cells.append(Cell(text="", position={'row': i, 'col': col}))
table.rows.append(empty_row)
except Exception as e:
print(f"处理数据行时出错: {str(e)}")
def _process_data_cell(self, cell: Any, row_index: int, col_index: int) -> Cell:
"""处理数据单元格"""
try:
# 获取单元格文本
text = cell.text.strip()
# 创建数据单元格
data_cell = Cell(
text=text,
position={'row': row_index, 'col': col_index}
)
# 识别数据类型
data_type = 'text'
original_value = text
formatted_value = text
# 尝试识别数据类型和格式化值
for type_name, pattern in self.patterns.items():
if re.match(pattern, text):
data_type = type_name
if type_name == 'currency':
# 处理货币金额
try:
value = float(re.sub(r'[¥,\s]', '', text))
original_value = value
formatted_value = f"¥{value:.2f}"
except ValueError:
pass
elif type_name == 'percentage':
# 处理百分比
try:
value = float(text.rstrip('%')) / 100
original_value = value
formatted_value = f"{value:.2%}"
except ValueError:
pass
elif type_name == 'date':
# 处理日期
try:
# 统一日期格式
date_text = re.sub(r'[年月日]', '-', text).rstrip('-')
date_obj = datetime.strptime(date_text, '%Y-%m-%d')
original_value = date_obj
formatted_value = date_obj.strftime('%Y-%m-%d')
except ValueError:
pass
break
data_cell.data_type = data_type
data_cell.original_value = original_value
data_cell.formatted_value = formatted_value
# 处理合并单元格
vmerge = cell._element.find('.//{*}vMerge')
gridspan = cell._element.find('.//{*}gridSpan')
if vmerge is not None:
val = vmerge.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', 'continue')
data_cell.row_span = 2 if val == 'restart' else 1
if gridspan is not None:
try:
data_cell.col_span = int(gridspan.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '1'))
except ValueError:
data_cell.col_span = 1
return data_cell
except Exception as e:
print(f"处理数据单元格时出错: {str(e)}")
return Cell(text="", position={'row': row_index, 'col': col_index})
def _normalize_table(self, table: Table):
"""规范化表格,确保所有行都有相同的列数"""
try:
max_cols = table.total_cols
# 确保每行都有正确的列数
for row in table.rows:
current_cols = len(row.cells)
if current_cols < max_cols:
# 添加空单元格
for col in range(current_cols, max_cols):
empty_cell = Cell(
text="",
is_header=row.is_header,
position={'row': row.row_index, 'col': col}
)
row.cells.append(empty_cell)
elif current_cols > max_cols:
# 移除多余的单元格
row.cells = row.cells[:max_cols]
# 更新表格的总列数
table.total_cols = max_cols
except Exception as e:
print(f"规范化表格时出错: {str(e)}")
def _identify_table_type(self, table: Table):
"""识别表格类型"""
try:
# 检查是否是键值对表格
if table.total_cols == 2:
key_col_pattern = all(
cell.text.strip() != "" for row in table.rows
if not row.is_header for cell in row.cells[:1]
)
if key_col_pattern:
table.table_type = "key_value"
return
# 检查是否是矩阵表格
if table.has_complex_header and table.total_cols > 2:
table.table_type = "matrix"
return
# 默认为普通表格
table.table_type = "normal"
except Exception as e:
print(f"识别表格类型时出错: {str(e)}")
table.table_type = "normal"
def convert_to_markdown(self, table: Table) -> str:
"""将表格转换为Markdown格式"""
try:
markdown_lines = []
# 处理表头
for i in range(table.header_rows):
row = table.rows[i]
header_cells = [cell.text for cell in row.cells]
markdown_lines.append("| " + " | ".join(header_cells) + " |")
# 添加分隔行
if i == table.header_rows - 1:
separator = "|" + "|".join(["---" for _ in range(table.total_cols)]) + "|"
markdown_lines.append(separator)
# 处理数据行
for row in table.rows[table.header_rows:]:
data_cells = [
cell.formatted_value if cell.formatted_value
else cell.text for cell in row.cells
]
markdown_lines.append("| " + " | ".join(data_cells) + " |")
return "\n".join(markdown_lines)
except Exception as e:
print(f"转换为Markdown格式时出错: {str(e)}")
return ""
def convert_to_html(self, table: Table) -> str:
"""将表格转换为HTML格式"""
try:
html_lines = ['<table border="1">']
# 处理表头
if table.header_rows > 0:
html_lines.append("<thead>")
for i in range(table.header_rows):
row = table.rows[i]
html_lines.append("<tr>")
for cell in row.cells:
span_attrs = []
if cell.row_span > 1:
span_attrs.append(f'rowspan="{cell.row_span}"')
if cell.col_span > 1:
span_attrs.append(f'colspan="{cell.col_span}"')
attrs = " ".join(span_attrs)
html_lines.append(f"<th {attrs}>{cell.text}</th>")
html_lines.append("</tr>")
html_lines.append("</thead>")
# 处理数据行
html_lines.append("<tbody>")
for row in table.rows[table.header_rows:]:
html_lines.append("<tr>")
for cell in row.cells:
span_attrs = []
if cell.row_span > 1:
span_attrs.append(f'rowspan="{cell.row_span}"')
if cell.col_span > 1:
span_attrs.append(f'colspan="{cell.col_span}"')
attrs = " ".join(span_attrs)
# 使用格式化值或原始文本
display_value = cell.formatted_value if cell.formatted_value else cell.text
html_lines.append(f"<td {attrs}>{display_value}</td>")
html_lines.append("</tr>")
html_lines.append("</tbody>")
html_lines.append("</table>")
return "\n".join(html_lines)
except Exception as e:
print(f"转换为HTML格式时出错: {str(e)}")
return ""
def convert_to_dict(self, table: Table) -> Dict[str, Any]:
"""将表格转换为字典格式"""
try:
result = {
'metadata': table.metadata,
'structure': {
'total_rows': table.total_rows,
'total_cols': table.total_cols,
'header_rows': table.header_rows,
'has_complex_header': table.has_complex_header,
'table_type': table.table_type
},
'headers': [],
'data': []
}
# 处理表头
for i in range(table.header_rows):
header_row = []
for cell in table.rows[i].cells:
header_row.append({
'text': cell.text,
'row_span': cell.row_span,
'col_span': cell.col_span,
'position': cell.position
})
result['headers'].append(header_row)
# 处理数据行
for row in table.rows[table.header_rows:]:
data_row = []
for cell in row.cells:
data_row.append({
'text': cell.text,
'data_type': cell.data_type,
'original_value': cell.original_value,
'formatted_value': cell.formatted_value,
'position': cell.position
})
result['data'].append(data_row)
return result
except Exception as e:
print(f"转换为字典格式时出错: {str(e)}")
return {}
def convert_to_text(self, table: Table) -> str:
"""
将表格转换为文本格式,以"标题:内容"的形式显示,多级表头用下划线连接
Args:
table: Table对象
Returns:
str: 表格的文本表示
"""
if not table or not table.rows:
return "【空表格】"
try:
# 存储处理后的文本行
text_parts = []
# 存储处理后的表头文本
header_texts = {}
# 处理表头
if table.header_rows > 0:
# 对于多级表头,需要合并处理
for row_idx in range(table.header_rows):
row = table.rows[row_idx]
for col_idx, cell in enumerate(row.cells):
# 获取当前列的已有表头文本
current_header = header_texts.get(col_idx, [])
if cell.text.strip():
current_header.append(cell.text.strip())
header_texts[col_idx] = current_header
# 合并多级表头
final_headers = {}
for col_idx, headers in header_texts.items():
final_headers[col_idx] = "_".join(headers) if headers else ""
# 处理数据行
data_rows = []
for row in table.rows[table.header_rows:]:
row_data = {}
for col_idx, cell in enumerate(row.cells):
if cell.text.strip():
row_data[col_idx] = cell.text.strip()
if row_data:
data_rows.append(row_data)
# 生成"标题:内容"格式输出
for row_idx, row_data in enumerate(data_rows):
row_parts = []
for col_idx, content in row_data.items():
if col_idx in final_headers and final_headers[col_idx]:
row_parts.append(f"{final_headers[col_idx]}:{content}")
if row_parts:
text_parts.append("".join(row_parts))
return "\n".join(text_parts)
except Exception as e:
print(f"转换表格为文本时出错: {str(e)}")
return "【表格处理失败】"
def _convert_table_to_text(self, table: Table) -> str:
"""
转换表格为文本格式(兼容方法)
Args:
table: Table对象
Returns:
str: 表格的文本表示
"""
return self.convert_to_text(table)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

File diff suppressed because one or more lines are too long

View File

@@ -1,14 +1,3 @@
# 文档内容
【文档信息】
作者: Lenovo
创建时间: 2025-05-15 08:30:10
修改时间: 2025-05-15 08:30:25
1111
【图片识别文本】
@@ -32,38 +21,4 @@
清 理 所 有 临 时 文 件 , 不 会 影 响 系 统 功 能 。
以 上 优 化 星 觞 失 了 临 时 文 伟 删 除 问 题 , 又 保 持 了 系 统 的
稳 定 性 , 让 您 能 雪 顺 畅 地 处 理 Bxcel 文 件 。
## 图片内容
### 图片 1
![图片 1](images/image_1.png)
**OCR文本内容:**
“ 完 善 了 异 常 处 理 , 防 止 惑 时 目 录 券 除 失 败 导 致 程 序 崖
4. 更 新 README.md
* 在 暨 近 更 新 部 分 记 录 了 临 时 文 件 处 理 机 制 的 改 进
。 添 加 了 Excel 文 件 句 柄 管 理
使 用 说 明
这 东 改 进 不 需 要 您 做 任 何 额 外 操 作 , 系 统 会 自 动 -
1. 在 处 理 Excel 文 件 时 正 球 关 闭 文 件 句 柄
2 当 尝 试 删 除 文 件 通 刨 “ 文 件 被 占 用 “ 错 误 时 , 自 动 等 待
并 重 试
3 即 使 无 法 券 除 临 时 文 件 , 也 不 影 响 处 #
如 果 仍 然 通 到 惧 时 文 件 问 题 , 系 统 会 在 下 次 启 动 时 自 动
清 理 所 有 临 时 文 件 , 不 会 影 响 系 统 功 能 。
以 上 优 化 星 觞 失 了 临 时 文 伟 删 除 问 题 , 又 保 持 了 系 统 的
稳 定 性 , 让 您 能 雪 顺 畅 地 处 理 Bxcel 文 件 。
稳 定 性 , 让 您 能 雪 顺 畅 地 处 理 Bxcel 文 件 。

View File

@@ -1 +1 @@
【文档信息】 作者: Lenovo 创建时间: 2025-05-15 08:30:10 修改时间: 2025-05-15 08:30:25 1111 【图片识别文本】 “ 完 善 了 异 常 处 理 , 防 止 惑 时 目 录 券 除 失 败 导 致 程 序 崖 澎 澎 4. 更 新 README.md * 在 暨 近 更 新 部 分 记 录 了 临 时 文 件 处 理 机 制 的 改 进 。 添 加 了 Excel 文 件 句 柄 管 理 使 用 说 明 这 东 改 进 不 需 要 您 做 任 何 额 外 操 作 , 系 统 会 自 动 - 1. 在 处 理 Excel 文 件 时 正 球 关 闭 文 件 句 柄 2 当 尝 试 删 除 文 件 通 刨 “ 文 件 被 占 用 “ 错 误 时 , 自 动 等 待 并 重 试 3 即 使 无 法 券 除 临 时 文 件 , 也 不 影 响 处 # 如 果 仍 然 通 到 惧 时 文 件 问 题 , 系 统 会 在 下 次 启 动 时 自 动 清 理 所 有 临 时 文 件 , 不 会 影 响 系 统 功 能 。 以 上 优 化 星 觞 失 了 临 时 文 伟 删 除 问 题 , 又 保 持 了 系 统 的 稳 定 性 , 让 您 能 雪 顺 畅 地 处 理 Bxcel 文 件 。
1111 【图片识别文本】 “ 完 善 了 异 常 处 理 , 防 止 惑 时 目 录 券 除 失 败 导 致 程 序 崖 澎 澎 4. 更 新 README.md * 在 暨 近 更 新 部 分 记 录 了 临 时 文 件 处 理 机 制 的 改 进 。 添 加 了 Excel 文 件 句 柄 管 理 使 用 说 明 这 东 改 进 不 需 要 您 做 任 何 额 外 操 作 , 系 统 会 自 动 - 1. 在 处 理 Excel 文 件 时 正 球 关 闭 文 件 句 柄 2 当 尝 试 删 除 文 件 通 刨 “ 文 件 被 占 用 “ 错 误 时 , 自 动 等 待 并 重 试 3 即 使 无 法 券 除 临 时 文 件 , 也 不 影 响 处 # 如 果 仍 然 通 到 惧 时 文 件 问 题 , 系 统 会 在 下 次 启 动 时 自 动 清 理 所 有 临 时 文 件 , 不 会 影 响 系 统 功 能 。 以 上 优 化 星 觞 失 了 临 时 文 伟 删 除 问 题 , 又 保 持 了 系 统 的 稳 定 性 , 让 您 能 雪 顺 畅 地 处 理 Bxcel 文 件 。

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

View File

@@ -1,6 +1,6 @@
fastapi>=0.100.0
python-multipart>=0.0.6
uvicorn>=0.23.0
fastapi>=0.68.0
python-multipart>=0.0.5
uvicorn>=0.15.0
python-docx>=0.8.11
numpy>=1.24.0
scikit-learn>=1.0.2
@@ -17,4 +17,5 @@ html2text>=2020.1.16
pandas>=2.0.0
aiofiles>=23.1.0
openpyxl>=3.1.2
uuid>=1.30
uuid>=1.30
pydantic>=1.8.2

View File

@@ -5,22 +5,198 @@ import re
import json
import argparse
def count_chinese_tokens(text):
"""
估算中文文本的token数量
1个汉字约等于1.5个token
1个英文单词约等于1个token
1个标点符号约等于1个token
"""
# 匹配中文字符
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
# 匹配英文单词
english_words = len(re.findall(r'[a-zA-Z]+', text))
# 匹配标点符号
punctuations = len(re.findall(r'[^\w\s]', text))
# 计算总token数粗略估算
total_tokens = chinese_chars * 1.5 + english_words + punctuations
return int(total_tokens)
def process_table_content(table_content):
"""
处理表格内容,移除表格标记并进行智能分段
处理策略:
1. 清理无效内容
2. 智能分段
3. 保持语义完整性
4. 控制token长度
"""
# 移除表格标记和多余空白
content = re.sub(r'表格\s*\d+\s*(?:开始|结束)', '', table_content)
content = re.sub(r'\s+', ' ', content).strip()
# 分段处理
paragraphs = []
current_para = []
# 按句子分割
sentences = re.split(r'([。!?\n])', content)
for i in range(0, len(sentences), 2):
sentence = sentences[i].strip()
if not sentence:
continue
# 添加标点符号(如果存在)
if i + 1 < len(sentences):
sentence += sentences[i + 1]
# 检查是否是新段落的开始
if (re.match(r'^[的]', sentence) or # 以"的"开头
re.match(r'^[在]', sentence) or # 以"在"开头
re.match(r'^[\w()]+[:]', sentence)): # 以键值对形式开头
# 保存当前段落
if current_para:
full_para = ''.join(current_para).strip()
if full_para:
# 控制token长度
if count_chinese_tokens(full_para) > 512:
split_paras = split_long_paragraph(full_para)
paragraphs.extend(split_paras)
else:
paragraphs.append(full_para)
current_para = []
current_para.append(sentence)
# 处理最后一个段落
if current_para:
full_para = ''.join(current_para).strip()
if full_para:
if count_chinese_tokens(full_para) > 512:
split_paras = split_long_paragraph(full_para)
paragraphs.extend(split_paras)
else:
paragraphs.append(full_para)
return paragraphs
def split_long_paragraph(paragraph):
"""智能分割长段落,保持语义完整性"""
result = []
# 首先尝试按逗号分割
parts = re.split(r'([,。!?])', paragraph)
current_part = ""
current_tokens = 0
for i in range(0, len(parts), 2):
part = parts[i].strip()
if not part:
continue
# 添加标点符号(如果存在)
if i + 1 < len(parts):
part += parts[i + 1]
part_tokens = count_chinese_tokens(part)
if current_tokens + part_tokens > 512:
if current_part:
result.append(current_part)
current_part = part
current_tokens = part_tokens
else:
current_part += part
current_tokens += part_tokens
if current_part:
result.append(current_part)
return result
def format_group_to_text(group):
"""将分组数据格式化为易读的文本,采用通用的处理方式"""
if not group:
return ""
parts = []
# 通用处理:遍历所有键值对,构建文本
for key, value in group.items():
# 跳过空值
if not value:
continue
# 清理和格式化键名
clean_key = re.sub(r'[_\(\)]', ' ', key).strip()
# 清理值中的"表格无有效数据"字眼
if isinstance(value, str):
value = re.sub(r'[【\[]*表格无[有效]*数据[】\]]*', '', value)
if not value.strip(): # 如果清理后为空,则跳过
continue
# 构建文本片段
text = f"{clean_key}{value}"
parts.append(text)
# 使用逗号连接所有部分,并确保结果中没有"表格无有效数据"字眼
result = "".join(parts)
result = re.sub(r'[【\[]*表格无[有效]*数据[】\]]*', '', result)
return result.strip("") + "" if result.strip("") else ""
def split_long_text(text):
"""将长文本按token限制分割"""
if count_chinese_tokens(text) <= 512:
return [text]
result = []
parts = re.split(r'([。])', text)
current_part = ""
current_tokens = 0
for i in range(0, len(parts), 2):
sentence = parts[i]
if i + 1 < len(parts):
sentence += parts[i + 1] # 添加句号
sentence_tokens = count_chinese_tokens(sentence)
if current_tokens + sentence_tokens > 512:
if current_part:
result.append(current_part)
current_part = sentence
current_tokens = sentence_tokens
else:
current_part += sentence
current_tokens += sentence_tokens
if current_part:
result.append(current_part)
return result
def split_text_into_paragraphs(text):
"""
将连续文本智能分段
策略:
1. 识别表格标记,将表格内容作为独立段落处理
2. 对普通文本按照语义和长度适当分段约500字/段)
3. 确保分段不破坏语义完整性
1. 基于标题和章节标记进行主要分段
2. 基于段落语义标记进行次要分段
3. 基于句子关联度进行内容分段
4. 基于token长度进行辅助分段确保每段不超过512个token
5. 保持段落的语义完整性
6. 智能处理表格内容
"""
# 清理文本中可能存在的多余空格
# 清理文本中可能存在的多余空格和换行
text = re.sub(r'\s+', ' ', text).strip()
# 识别表格范围,表格以"表格 N 开始"和"表格 N 结束"标记
table_pattern = re.compile(r'表格\s*\d+\s*开始(.*?)表格\s*\d+\s*结束', re.DOTALL)
# 使用表格标记分割文本
# 首先处理表格内容
table_pattern = re.compile(r'(表格\s*\d+\s*开始.*?表格\s*\d+\s*结束)', re.DOTALL)
parts = []
last_end = 0
@@ -29,9 +205,14 @@ def split_text_into_paragraphs(text):
if match.start() > last_end:
parts.append(("text", text[last_end:match.start()]))
# 获取表格内容(去掉表格标记)
table_content = match.group(1).strip()
parts.append(("table", table_content))
# 处理表格内容
table_content = match.group(1)
table_paragraphs = process_table_content(table_content)
for para in table_paragraphs:
# 确保表格段落没有冒号开头
para = re.sub(r'^[:]+\s*', '', para.strip())
if para: # 只添加非空段落
parts.append(("table", para))
last_end = match.end()
@@ -39,112 +220,132 @@ def split_text_into_paragraphs(text):
if last_end < len(text):
parts.append(("text", text[last_end:]))
# 如果没有找到表格,整个文本作为一个文本片段
# 如果没有找到表格,整个文本作为一个文本部分
if not parts:
parts = [("text", text)]
# 对文本段落进行处理
final_paragraphs = []
# 主要分段标记(标题、章节等)
major_markers = [
r'^第[一二三四五六七八九十百千]+[章节篇]', # 中文数字章节
r'^第\d+[章节篇]', # 阿拉伯数字章节
r'^[一二三四五六七八九十][、.]', # 中文数字序号
r'^\d+[、.]', # 阿拉伯数字序号
r'^[(][一二三四五六七八九十][)]', # 带括号的中文数字
r'^[(]\d+[)]', # 带括号的阿拉伯数字
r'^[IVX]+[、.]', # 罗马数字序号
]
# 可能表示段落边界或重要语义分割点的标记
paragraph_markers = [
r'^第.{1,3}章',
r'^第.{1,3}节',
r'^[一二三四五六七八九十][、.\s]',
r'^\d+[、.\s]',
r'^[IVX]+[、.\s]',
r'^附录',
r'^前言',
r'^目录',
# 次要分段标记(语义转折等)
minor_markers = [
r'然而[,]',
r'但是[,]',
r'不过[,]',
r'相反[,]',
r'因此[,]',
r'所以[,]',
r'总的来说',
r'综上所述',
r'总而言之',
r'例如[,]',
r'比如[,]',
r'首先[,]',
r'其次[,]',
r'最后[,]',
r'另外[,]',
]
# 特殊段落标记
special_markers = [
r'^摘要',
r'^引言',
r'^前言',
r'^结论',
r'^参考文献'
r'^致谢',
r'^参考文献',
r'^注释',
r'^附录',
]
marker_pattern = re.compile('|'.join(paragraph_markers))
# 合并所有标记模式
all_markers = major_markers + special_markers
marker_pattern = '|'.join(all_markers)
minor_marker_pattern = '|'.join(minor_markers)
# 按句子分割的分隔符
sentence_separators = r'([。!?\!\?])'
# 目标段落长度(字符数)
target_length = 500
# 最小段落长度阈值
min_length = 100
# 最大段落长度阈值
max_length = 800
# 分段处理
paragraphs = []
for part_type, content in parts:
# 如果是表格内容,直接添加为独立段落
if part_type == "table":
final_paragraphs.append(content)
# 表格内容已经过处理,直接添加
paragraphs.append(content)
continue
# 处理普通文本
# 按句子分割文本
sentences = re.split(sentence_separators, content)
# 将分割后的句子和标点符号重新组合
sentence_list = []
for i in range(0, len(sentences)-1, 2):
if i+1 < len(sentences):
sentence_list.append(sentences[i] + sentences[i+1])
else:
sentence_list.append(sentences[i])
# 如果最后一个元素不是句子结束符,添加它
if len(sentences) % 2 == 1:
if sentences[-1]:
sentence_list.append(sentences[-1])
# 构建段落
current_para = ""
for sentence in sentence_list:
# 检查是否是段落标记的开始
is_marker = marker_pattern.search(sentence)
# 如果当前段落已经足够长,或者遇到段落标记,则开始新段落
if ((len(current_para) >= target_length and len(current_para) + len(sentence) > max_length) or
(is_marker and current_para)):
if current_para.strip():
final_paragraphs.append(current_para.strip())
current_para = sentence
else:
current_para += sentence
current_tokens = 0
# 添加最后一个段落
if current_para.strip():
final_paragraphs.append(current_para.strip())
# 按主要标记分段
text_parts = re.split(f'({marker_pattern})', content)
for i, part in enumerate(text_parts):
if not part.strip(): # 跳过空部分
continue
# 去除冒号开头
part = re.sub(r'^[:]+\s*', '', part.strip())
if not part: # 跳过清理后为空的部分
continue
if i % 2 == 1: # 是标记
if current_para:
paragraphs.append(current_para)
current_para = part
current_tokens = count_chinese_tokens(part)
else: # 是内容
sentences = re.split(sentence_separators, part)
for j, sentence in enumerate(sentences):
if not sentence.strip():
continue
# 去除句子开头的冒号
sentence = re.sub(r'^[:]+\s*', '', sentence.strip())
if not sentence:
continue
sentence_tokens = count_chinese_tokens(sentence)
# 检查是否有次要分段标记
has_minor_marker = bool(re.search(minor_marker_pattern, sentence))
if has_minor_marker and current_para:
paragraphs.append(current_para)
current_para = sentence
current_tokens = sentence_tokens
elif current_tokens + sentence_tokens > 512:
if current_para:
paragraphs.append(current_para)
current_para = sentence
current_tokens = sentence_tokens
else:
if current_para:
current_para += sentence
else:
current_para = sentence
current_tokens += sentence_tokens
if current_para:
paragraphs.append(current_para)
# 对段落进行后处理,合并过短的段落
processed_paragraphs = []
temp_para = ""
# 最后一次清理所有段落,确保没有冒号开头
cleaned_paragraphs = []
for para in paragraphs:
para = re.sub(r'^[:]+\s*', '', para.strip())
if para: # 只添加非空段落
cleaned_paragraphs.append(para)
for para in final_paragraphs:
if len(para) < min_length:
# 如果段落太短,尝试与临时段落合并
if temp_para:
temp_para += " " + para
else:
temp_para = para
else:
# 如果有临时段落,先处理它
if temp_para:
# 如果临时段落也很短,合并到当前段落
if len(temp_para) < min_length:
para = temp_para + " " + para
else:
processed_paragraphs.append(temp_para)
temp_para = ""
processed_paragraphs.append(para)
# 处理最后可能剩余的临时段落
if temp_para:
if processed_paragraphs and len(temp_para) < min_length:
processed_paragraphs[-1] += " " + temp_para
else:
processed_paragraphs.append(temp_para)
return processed_paragraphs
return cleaned_paragraphs
def save_to_json(paragraphs, output_file):
"""将段落保存为JSON格式"""
@@ -158,10 +359,19 @@ def save_to_json(paragraphs, output_file):
print(f"成功将文本分成 {len(paragraphs)} 个段落并保存到 {output_file}")
def save_to_txt(paragraphs, output_file):
"""将段落保存为TXT格式每段用换行符分隔"""
with open(output_file, 'w', encoding='utf-8') as f:
for paragraph in paragraphs:
f.write(paragraph + '\n\n') # 使用两个换行符使段落分隔更清晰
print(f"成功将文本分成 {len(paragraphs)} 个段落并保存到 {output_file}")
def main():
parser = argparse.ArgumentParser(description="将连续文本智能分段并保存为JSON")
parser.add_argument("input_file", help="输入文本文件路径")
parser.add_argument("--output", "-o", default="paragraphs.json", help="输出JSON文件路径")
parser = argparse.ArgumentParser(description="将连续文本智能分段并保存为TXT或JSON")
parser.add_argument("input_file", help="输入文件路径例如sample_continuous_text.txt")
parser.add_argument("--output", "-o", default="paragraphs.txt", help="输出文件路径,默认为当前目录下的 paragraphs.txt")
parser.add_argument("--format", "-f", choices=['txt', 'json'], default='txt', help="输出文件格式支持txt和json默认为txt")
args = parser.parse_args()
@@ -176,8 +386,11 @@ def main():
# 分段
paragraphs = split_text_into_paragraphs(text)
# 保存为JSON
save_to_json(paragraphs, args.output)
# 根据指定格式保存
if args.format == 'json':
save_to_json(paragraphs, args.output)
else:
save_to_txt(paragraphs, args.output)
if __name__ == "__main__":
main()