文本分段

This commit is contained in:
chenlin 2025-05-14 14:06:27 +08:00
parent 1ee4ed24d3
commit a73040d739
4 changed files with 201 additions and 3 deletions

View File

@ -473,8 +473,9 @@ class DocCleaner:
elements_to_insert.append(('paragraph', doc.add_paragraph()._element))
# 添加到文本输出
text_output.append(f"表格 {table_index + 1}:")
text_output.append(f"表格 {table_index + 1} 开始:")
text_output.append(table_text)
text_output.append(f"表格 {table_index + 1} 结束:")
except Exception as e:
print(f"警告:处理表格时出错: {str(e)}")
@ -1020,4 +1021,14 @@ if __name__ == '__main__':
#
# args = parser.parse_args()
process_directory("D:/rzData/poject/AI项目/中烟/后台服务/es数据/数据验证", "D:/rzData/poject/AI项目/中烟/后台服务/es数据/数据验证")
process_directory("D:\Desktop\DEMO", "D:\Desktop\DEMO")
# 确保目录存在,如果不存在则创建
# 创建基础目录(使用更安全的方式)
# base_dir = 'D:\Desktop\DEMO'
# text_dir = os.path.join(base_dir, "测试")
#
# os.makedirs(text_dir, exist_ok=True, mode=0o777)
#
# print(f"目录是否存在: {os.path.exists(text_dir)}")
# print(f"完整路径: {os.path.abspath(text_dir)}") # 或者直接 print(f"完整路径: {text_dir}")

View File

@ -1,4 +1,7 @@
python-docx>=0.8.11
python-docx==0.8.11
reportlab==4.0.4
difflib
python-Levenshtein==0.22.0
regex>=2023.0.0
scikit-learn>=1.3.0
numpy>=1.24.0

File diff suppressed because one or more lines are too long

183
text_paragraph_splitter.py Normal file
View File

@ -0,0 +1,183 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import json
import argparse
def split_text_into_paragraphs(text):
"""
将连续文本智能分段
策略:
1. 识别表格标记将表格内容作为独立段落处理
2. 对普通文本按照语义和长度适当分段约500字/
3. 确保分段不破坏语义完整性
"""
# 清理文本中可能存在的多余空格
text = re.sub(r'\s+', ' ', text).strip()
# 识别表格范围,表格以"表格 N 开始"和"表格 N 结束"标记
table_pattern = re.compile(r'表格\s*\d+\s*开始(.*?)表格\s*\d+\s*结束', re.DOTALL)
# 使用表格标记分割文本
parts = []
last_end = 0
for match in table_pattern.finditer(text):
# 添加表格前的文本
if match.start() > last_end:
parts.append(("text", text[last_end:match.start()]))
# 获取表格内容(去掉表格标记)
table_content = match.group(1).strip()
parts.append(("table", table_content))
last_end = match.end()
# 添加最后一个表格之后的文本
if last_end < len(text):
parts.append(("text", text[last_end:]))
# 如果没有找到表格,则整个文本作为一个文本片段
if not parts:
parts = [("text", text)]
# 对文本段落进行处理
final_paragraphs = []
# 可能表示段落边界或重要语义分割点的标记
paragraph_markers = [
r'^第.{1,3}章',
r'^第.{1,3}节',
r'^[一二三四五六七八九十][、.\s]',
r'^\d+[、.\s]',
r'^[IVX]+[、.\s]',
r'^附录',
r'^前言',
r'^目录',
r'^摘要',
r'^引言',
r'^结论',
r'^参考文献'
]
marker_pattern = re.compile('|'.join(paragraph_markers))
# 按句子分割的分隔符
sentence_separators = r'([。!?\!\?])'
# 目标段落长度(字符数)
target_length = 500
# 最小段落长度阈值
min_length = 100
# 最大段落长度阈值
max_length = 800
for part_type, content in parts:
# 如果是表格内容,直接添加为独立段落
if part_type == "table":
final_paragraphs.append(content)
continue
# 处理普通文本
# 按句子分割文本
sentences = re.split(sentence_separators, content)
# 将分割后的句子和标点符号重新组合
sentence_list = []
for i in range(0, len(sentences)-1, 2):
if i+1 < len(sentences):
sentence_list.append(sentences[i] + sentences[i+1])
else:
sentence_list.append(sentences[i])
# 如果最后一个元素不是句子结束符,添加它
if len(sentences) % 2 == 1:
if sentences[-1]:
sentence_list.append(sentences[-1])
# 构建段落
current_para = ""
for sentence in sentence_list:
# 检查是否是段落标记的开始
is_marker = marker_pattern.search(sentence)
# 如果当前段落已经足够长,或者遇到段落标记,则开始新段落
if ((len(current_para) >= target_length and len(current_para) + len(sentence) > max_length) or
(is_marker and current_para)):
if current_para.strip():
final_paragraphs.append(current_para.strip())
current_para = sentence
else:
current_para += sentence
# 添加最后一个段落
if current_para.strip():
final_paragraphs.append(current_para.strip())
# 对段落进行后处理,合并过短的段落
processed_paragraphs = []
temp_para = ""
for para in final_paragraphs:
if len(para) < min_length:
# 如果段落太短,尝试与临时段落合并
if temp_para:
temp_para += " " + para
else:
temp_para = para
else:
# 如果有临时段落,先处理它
if temp_para:
# 如果临时段落也很短,合并到当前段落
if len(temp_para) < min_length:
para = temp_para + " " + para
else:
processed_paragraphs.append(temp_para)
temp_para = ""
processed_paragraphs.append(para)
# 处理最后可能剩余的临时段落
if temp_para:
if processed_paragraphs and len(temp_para) < min_length:
processed_paragraphs[-1] += " " + temp_para
else:
processed_paragraphs.append(temp_para)
return processed_paragraphs
def save_to_json(paragraphs, output_file):
"""将段落保存为JSON格式"""
data = {
"total_paragraphs": len(paragraphs),
"paragraphs": paragraphs
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"成功将文本分成 {len(paragraphs)} 个段落并保存到 {output_file}")
def main():
parser = argparse.ArgumentParser(description="将连续文本智能分段并保存为JSON")
parser.add_argument("input_file", help="输入文本文件路径")
parser.add_argument("--output", "-o", default="paragraphs.json", help="输出JSON文件路径")
args = parser.parse_args()
# 读取输入文件
try:
with open(args.input_file, 'r', encoding='utf-8') as f:
text = f.read()
except Exception as e:
print(f"读取文件出错: {e}")
return
# 分段
paragraphs = split_text_into_paragraphs(text)
# 保存为JSON
save_to_json(paragraphs, args.output)
if __name__ == "__main__":
main()