一、文档处理是 RAG 系统的基石1.1 为什么文档处理决定了 RAG 系统的上限RAG 系统的核心逻辑是 **检索相关文档片段 → 喂给大模型生成回答**整个流程的质量完全依赖于文档处理环节如果文档解析失败再好的检索和生成模型也无法获取有效信息如果分块不合理要么检索到的片段上下文不完整要么包含太多无关信息工业级统计显示文档处理环节的优化能带来 30%-50% 的 RAG 回答准确率提升是投入产出比最高的优化点1.2 文档处理的完整流水线一个工业级文档处理流水线包含以下 5 个步骤原始文档 → 文档加载 → 文本清洗 → 文本分块 → 元数据增强 → 分块存储文档加载将不同格式的文件PDF、Word、Markdown 等转换为纯文本文本清洗去除乱码、多余空格、页眉页脚、重复内容等噪声文本分块将长文本切割成适合大模型上下文窗口和检索的小块元数据增强为每个分块添加来源、页码、章节、作者等元信息分块存储将分块和元数据保存到文件或向量数据库1.3 文档加载的核心挑战不同格式的文档有不同的解析难点没有万能的解析工具文档格式主要挑战常见问题PDF扫描件识别、表格提取、页眉页脚去除、跨页断句乱码、表格变成纯文本乱序、图片无法提取Word格式解析、嵌入式对象、修订痕迹格式丢失、批注被当成正文Markdown标题层级解析、代码块处理、链接提取标题层级混乱、代码块被拆分Excel/CSV表格结构解析、多 sheet 处理表格行被拆分、数值格式丢失PPT幻灯片顺序、备注提取、图片文字备注丢失、文本框顺序混乱1.4 文本分块的核心原则分块不是简单的按长度切割必须遵循以下三大原则原则 1语义完整性一个分块应该包含一个完整的语义单元如一个段落、一个知识点避免将一句话或一个概念拆分到两个分块中。错误分块分块1RAG技术的核心思想是将外部知识库检索与大模型生成相结合它可以有效解决大模型的 分块2知识过时和幻觉问题。正确分块分块1RAG技术的核心思想是将外部知识库检索与大模型生成相结合它可以有效解决大模型的知识过时和幻觉问题。原则 2上下文相关性分块大小要与检索粒度和大模型上下文窗口匹配太小丢失上下文信息检索精度下降太大包含太多无关信息稀释核心内容增加大模型处理成本工业级推荐分块大小场景推荐分块大小重叠大小通用问答512-1024 字符50-100 字符技术文档1024-2048 字符100-200 字符法律合同2048-4096 字符200-400 字符小说 / 长文本4096-8192 字符400-800 字符原则 3可追溯性每个分块必须保留完整的元数据能够追溯到原始文档的具体位置如页码、章节、行号这是实现引用标注和答案溯源的基础。1.5 主流分块策略对比分块策略原理优点缺点适用场景固定长度分块按字符数或 Token 数切割实现简单、速度快容易破坏语义完整性快速原型、简单文本递归字符分块按分隔符优先级递归切割段落→句子→单词尽量保留语义完整性仍可能破坏长句子通用场景最常用语义分块基于嵌入向量的相似度切割完美保留语义完整性速度慢、依赖嵌入模型高质量 RAG 系统句子窗口分块以句子为单位分块检索时扩展上下文检索精度高、上下文完整分块数量多、检索慢精准问答场景父子分块小分块用于检索大分块用于生成兼顾检索精度和上下文完整性实现复杂工业级 RAG 系统二、LangChain 文档处理核心 API 详解2.1 文档对象DocumentLangChain 中所有文档都表示为Document对象包含两个核心字段page_content文档的文本内容metadata字典类型存储文档的元数据来源、页码、作者等from langchain_core.documents import Document # 创建一个文档对象 doc Document( page_content这是文档的内容, metadata{ source: test.pdf, page: 1, author: 张三 } ) print(doc.page_content) print(doc.metadata)2.2 文档加载器DocumentLoaderLangChain 提供了 100 种文档加载器支持几乎所有常见的文档格式。所有加载器都实现了统一的接口load()同步加载所有文档返回list[Document]lazy_load()惰性加载逐个返回文档适合大文件2.2.1 纯文本加载器TextLoaderfrom langchain_community.document_loaders import TextLoader # 加载纯文本文件 loader TextLoader( file_pathdata/test.txt, encodingutf-8, autodetect_encodingTrue # 自动检测编码 ) documents loader.load() print(f加载了{len(documents)}个文档) print(documents[0].page_content[:100])2.2.2 Markdown 加载器MarkdownLoader支持解析 Markdown 的标题层级、代码块、列表等结构from langchain_community.document_loaders import MarkdownLoader # 加载Markdown文件 loader MarkdownLoader( file_pathdata/test.md, modeelements # 按元素解析返回标题、段落、代码块等不同类型的文档 ) documents loader.load() for doc in documents: print(f类型{doc.metadata[type]}内容{doc.page_content[:50]})2.2.3 PDF 加载器PyPDFLoader最常用的 PDF 加载器基于 PyPDF2 实现支持页码提取from langchain_community.document_loaders import PyPDFLoader # 加载PDF文件 loader PyPDFLoader(file_pathdata/test.pdf) # 每页返回一个Document对象 documents loader.load() for doc in documents: print(f页码{doc.metadata[page]}内容{doc.page_content[:100]})2.2.5 通用加载器UnstructuredLoader支持所有常见格式的万能加载器基于 Unstructured.IO 库是工业级首选from langchain_community.document_loaders import UnstructuredLoader # 自动识别文件格式 loader UnstructuredLoader( file_pathdata/test.pdf, strategyfast, # 解析策略fast快速/ hi_res高精度支持OCR include_page_breaksTrue, # 包含分页符 extract_images_in_pdfFalse # 是否提取PDF中的图片 ) documents loader.load() print(f加载了{len(documents)}个文档)2.3 文本分块器TextSplitterLangChain 提供了多种文本分块器所有分块器都实现了统一的接口split_documents(documents: list[Document])将文档列表切割成分块列表split_text(text: str)将纯文本切割成字符串列表2.3.1 递归字符分块器RecursiveCharacterTextSplitter工业级最常用的分块器按以下分隔符优先级递归切割[\n\n, \n, , ]优先按段落切割段落太长按行切割行太长按空格切割最后按字符切割。from langchain_text_splitters import RecursiveCharacterTextSplitter # 创建分块器 text_splitter RecursiveCharacterTextSplitter( chunk_size1000, # 分块大小字符数 chunk_overlap100, # 分块重叠大小 separators[\n\n, \n, 。, , , , ], # 中文优化的分隔符 length_functionlen, # 长度计算函数 is_separator_regexFalse # 是否使用正则表达式作为分隔符 ) # 切割文档 documents loader.load() chunks text_splitter.split_documents(documents) print(f原始文档数{len(documents)}) print(f分块数{len(chunks)}) print(f第一个分块{chunks[0].page_content}) print(f第一个分块元数据{chunks[0].metadata})2.3.2 语义分块器SemanticChunker基于嵌入向量的相似度将语义相关的内容分到同一个块中from langchain_text_splitters import SemanticChunker from langchain_openai import OpenAIEmbeddings # 创建语义分块器 semantic_splitter SemanticChunker( embeddingsOpenAIEmbeddings(), # 嵌入模型 breakpoint_threshold_typepercentile, # 断点阈值类型 breakpoint_threshold_amount95, # 阈值百分比 chunk_size1000 # 目标分块大小 ) # 切割文档 chunks semantic_splitter.split_documents(documents)2.3.3 句子分块器SentenceSplitter按句子切割文本适合句子窗口检索from langchain_text_splitters import SentenceSplitter sentence_splitter SentenceSplitter( chunk_size100, chunk_overlap20, languagezh # 中文支持 ) chunks sentence_splitter.split_documents(documents)三、工业级文档处理最佳实践3.1 文档预处理清洗与标准化原始文档中存在大量噪声必须进行预处理才能用于 RAG 系统import re from langchain_core.documents import Document def clean_text(text: str) - str: 文本清洗函数 # 去除多余的空行和空格 text re.sub(r\n{3,}, \n\n, text) text re.sub(r , , text) text re.sub(r\t, , text) # 去除页眉页脚示例匹配第X页 共Y页格式 text re.sub(r第\d页\s*共\d页, , text) # 去除特殊字符 text re.sub(r[\x00-\x1f\x7f-\x9f], , text) # 去除首尾空白 text text.strip() return text def clean_documents(documents: list[Document]) - list[Document]: 清洗文档列表 cleaned_docs [] for doc in documents: cleaned_content clean_text(doc.page_content) # 跳过空文档 if len(cleaned_content) 10: cleaned_doc Document( page_contentcleaned_content, metadatadoc.metadata ) cleaned_docs.append(cleaned_doc) return cleaned_docs3.2 元数据增强为分块添加丰富的元数据提升检索精度和可追溯性def enhance_metadata(documents: list[Document], source: str) - list[Document]: 增强文档元数据 enhanced_docs [] for i, doc in enumerate(documents): metadata { **doc.metadata, source: source, chunk_id: f{source}_{i}, chunk_index: i, total_chunks: len(documents), upload_time: time.time() } enhanced_doc Document( page_contentdoc.page_content, metadatametadata ) enhanced_docs.append(enhanced_doc) return enhanced_docs3.3 批量处理与异常容错工业级系统需要支持批量处理大量文档并能处理单个文档解析失败的情况from pathlib import Path from utils.logger import logger from utils.exceptions import DocumentParseError def batch_load_documents(dir_path: str | Path) - list[Document]: 批量加载目录下的所有文档 dir_path Path(dir_path) all_documents [] # 支持的文件格式 supported_formats [.txt, .md, .pdf, .docx, .doc] for file_path in dir_path.iterdir(): if file_path.is_file() and file_path.suffix.lower() in supported_formats: try: logger.info(f正在加载文档{file_path.name}) documents load_single_document(file_path) all_documents.extend(documents) logger.info(f✅ 文档加载成功{file_path.name}共{len(documents)}页) except Exception as e: logger.error(f❌ 文档加载失败{file_path.name}错误{str(e)}) # 跳过失败的文档继续处理其他文档 continue logger.info(f批量加载完成共加载{len(all_documents)}个文档) return all_documents3.4 分块参数调优指南分块参数没有万能值需要根据你的文档类型和业务场景进行调优先从默认值开始chunk_size1000chunk_overlap100测试不同的分块大小512、1024、2048对比检索准确率调整重叠大小通常为分块大小的 10%-20%优化分隔符针对中文添加。、、等句子结束符使用语义分块如果对质量要求高且能接受较慢的速度四、项目整合实现工业级文档处理模块现在我们将今天所学的内容整合到前两天搭建的 LangChain 2026 框架中实现一个完整的文档处理流水线。4.1 第一步新增依赖在requirements.txt中添加文档处理相关依赖# 文档处理依赖langchain-communitypypdfpython-docxunstructuredmarkdownpython-magic-bin; sys_platform win324.2 第二步新增自定义异常在utils/exceptions.py中添加文档处理相关异常# 在现有异常类后面添加 class DocumentParseError(FrameworkBaseException): 文档解析失败异常 def __init__(self, file_path: str, details: str ): message f文档解析失败{file_path} if details: message f详细信息{details} super().__init__(message, error_code1003) class ChunkError(FrameworkBaseException): 文本分块失败异常 def __init__(self, details: str ): message 文本分块失败 if details: message f详细信息{details} super().__init__(message, error_code1004)4.3 第三步实现文档处理核心模块在core/目录下创建document_processor.pyimport time import re from pathlib import Path from typing import List from langchain_core.documents import Document from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, Docx2txtLoader, UnstructuredFileLoader, UnstructuredExcelLoader ) # ✅ 只导入稳定版存在的分块器 from langchain_text_splitters import ( RecursiveCharacterTextSplitter, CharacterTextSplitter ) from config.settings import settings from utils.logger import logger from utils.exceptions import DocumentParseError, ChunkError class DocumentProcessor: 工业级文档处理器最终稳定版 所有实验性功能均做自动降级处理确保无依赖也能正常运行 def __init__( self, chunk_size: int 1000, chunk_overlap: int 100, chunking_strategy: str recursive, use_semantic_chunking: bool False ): self.chunk_size chunk_size self.chunk_overlap chunk_overlap self.chunking_strategy chunking_strategy self.use_semantic_chunking use_semantic_chunking # 初始化分块器自动处理实验性功能依赖 self._init_text_splitter() logger.info( f✅ 文档处理器初始化完成 | f分块大小{chunk_size} | f重叠大小{chunk_overlap} | f分块策略{self.chunking_strategy} ) def _init_text_splitter(self): 初始化文本分块器自动降级版 if self.use_semantic_chunking: # ✅ 优先使用本地 BGE 模型 try: from langchain_experimental.text_splitter import SemanticChunker from langchain_huggingface import HuggingFaceEmbeddings logger.info(正在初始化语义分块器使用本地BGE模型...) logger.info(f正在加载本地模型{settings.embedding_model_path}) # 使用本地模型路径 embeddings HuggingFaceEmbeddings( model_namesettings.embedding_model_path, model_kwargs{device: cpu}, encode_kwargs{normalize_embeddings: True} ) self.text_splitter SemanticChunker( embeddingsembeddings, breakpoint_threshold_typepercentile, breakpoint_threshold_amount95, min_chunk_size100 ) self.chunking_strategy semantic logger.info(✅ 语义分块器初始化成功使用本地BGE模型) except ImportError as e: logger.warning(f⚠️ 缺少依赖{str(e)}自动降级为递归字符分块器) logger.warning(提示如需使用语义分块请运行pip install langchain-experimental langchain-huggingface) self.use_semantic_chunking False self.chunking_strategy recursive self._init_recursive_splitter() except Exception as e: logger.warning(f⚠️ 语义分块器初始化失败{str(e)}自动降级为递归字符分块器) self.use_semantic_chunking False self.chunking_strategy recursive self._init_recursive_splitter() else: self._init_recursive_splitter() def _init_recursive_splitter(self): 初始化工业级递归字符分块器中文深度优化 self.text_splitter RecursiveCharacterTextSplitter( chunk_sizeself.chunk_size, chunk_overlapself.chunk_overlap, separators[ \n\n## , \n\n### , \n\n#### , \n\n##### , # Markdown标题优先级最高 \n\n, \n, # 段落和行 。, , , , , # 中文句子结束符 , 、, , # 中文标点和空格 , # 最后按单个字符分割 ], length_functionlen, is_separator_regexFalse, keep_separatorTrue, # 保留分隔符保证语义完整 strip_whitespaceTrue # 自动去除首尾空白 ) self.chunking_strategy recursive def _clean_text(self, text: str) - str: 工业级文本清洗函数 if not text: return # 1. 统一换行符 text text.replace(\r\n, \n).replace(\r, \n) # 2. 去除多余空白 text re.sub(r[ \t], , text) text re.sub(r\n{3,}, \n\n, text) # 3. 去除常见噪声 text re.sub(r第\d页\s*共\d页, , text) text re.sub(r版权所有.*?保留所有权利, , text, flagsre.DOTALL) text re.sub(rhttps?://\S, , text) text re.sub(r\b[A-Za-z0-9._%-][A-Za-z0-9.-]\.[A-Z|a-z]{2,}\b, , text) text re.sub(r1[3-9]\d{9}, , text) # 4. 去除控制字符 text re.sub(r[\x00-\x1f\x7f-\x9f], , text) return text.strip() def _load_single_document(self, file_path: Path) - List[Document]: 加载单个文档国内网络优化版 移除所有需要自动下载模型的加载器全部使用原生无依赖实现 try: suffix file_path.suffix.lower() logger.debug(f加载文件{file_path.name}) # ✅ 所有加载器均使用无外部依赖的原生实现 if suffix .txt or suffix .md: # Markdown直接用TextLoader加载效果完全满足RAG需求 loader TextLoader( file_path, encodingutf-8, autodetect_encodingTrue ) elif suffix .pdf: loader PyPDFLoader(file_path) elif suffix in [.docx, .doc]: loader Docx2txtLoader(file_path) elif suffix in [.xlsx, .xls]: # Excel使用原生加载器需要openpyxl依赖 try: from langchain_community.document_loaders import OpenpyxlLoader loader OpenpyxlLoader(file_path, read_onlyTrue, data_onlyTrue) except ImportError: logger.warning(⚠️ 未安装openpyxl跳过Excel文件) return [] else: # 未知格式尝试用TextLoader加载 logger.warning(f⚠️ 不支持的格式{suffix}尝试用文本方式加载) loader TextLoader(file_path, encodingutf-8, autodetect_encodingTrue) documents loader.load() # 清洗和过滤 cleaned_docs [] for doc in documents: cleaned_content self._clean_text(doc.page_content) if len(cleaned_content) 20: # 为Markdown添加特殊元数据 if suffix .md: doc.metadata[file_type] markdown cleaned_doc Document( page_contentcleaned_content, metadatadoc.metadata ) cleaned_docs.append(cleaned_doc) return cleaned_docs except Exception as e: raise DocumentParseError(str(file_path), str(e)) from e def _enhance_metadata(self, chunks: List[Document], file_name: str) - List[Document]: 增强分块元数据 enhanced_chunks [] for i, chunk in enumerate(chunks): metadata { **chunk.metadata, source: file_name, chunk_id: f{file_name.replace(., _)}_{i}, chunk_index: i, total_chunks: len(chunks), chunk_length: len(chunk.page_content), process_time: int(time.time()), chunking_strategy: self.chunking_strategy } enhanced_chunk Document( page_contentchunk.page_content, metadatametadata ) enhanced_chunks.append(enhanced_chunk) return enhanced_chunks def process_file( self, file_path: str | Path, enhance_metadata: bool True ) - List[Document]: 处理单个文件 file_path Path(file_path) if not file_path.exists(): raise FileNotFoundError(f文件不存在{file_path}) logger.info(f开始处理{file_path.name}) # ✅ 检查 text_splitter 是否已初始化 if not hasattr(self, text_splitter) or self.text_splitter is None: logger.error(❌ text_splitter 未初始化使用默认递归分块器) self.chunking_strategy recursive self._init_recursive_splitter() try: documents self._load_single_document(file_path) if not documents: logger.warning(f文档{file_path.name}无有效内容) return [] chunks self.text_splitter.split_documents(documents) logger.info(f分块完成{len(chunks)}个分块) if enhance_metadata: chunks self._enhance_metadata(chunks, file_path.name) logger.info(f✅ 处理完成{file_path.name}) return chunks except Exception as e: logger.error(f❌ 处理失败{file_path.name}错误{str(e)}, exc_infoTrue) raise ChunkError(f处理文档{file_path.name}失败) from e def process_directory( self, dir_path: str | Path, recursive: bool False, enhance_metadata: bool True ) - List[Document]: 批量处理目录 dir_path Path(dir_path) if not dir_path.exists(): raise FileNotFoundError(f目录不存在{dir_path}) logger.info(f开始批量处理目录{dir_path}) all_chunks [] supported_formats [.txt, .md, .pdf, .docx, .doc, .xlsx, .xls] glob_pattern **/* if recursive else * for file_path in dir_path.glob(glob_pattern): if file_path.is_file() and file_path.suffix.lower() in supported_formats: try: chunks self.process_file(file_path, enhance_metadata) all_chunks.extend(chunks) except Exception: continue logger.info(f✅ 批量处理完成共生成{len(all_chunks)}个分块) return all_chunks def save_chunks_to_file(self, chunks: List[Document], output_path: str | Path): 保存分块到JSONL文件 import json output_path Path(output_path) output_path.parent.mkdir(exist_okTrue, parentsTrue) with open(output_path, w, encodingutf-8) as f: for chunk in chunks: chunk_data { page_content: chunk.page_content, metadata: chunk.metadata } f.write(json.dumps(chunk_data, ensure_asciiFalse) \n) logger.info(f分块已保存到{output_path.resolve()}) def load_chunks_from_file(self, input_path: str | Path) - List[Document]: 从JSONL文件加载分块 import json input_path Path(input_path) if not input_path.exists(): raise FileNotFoundError(f分块文件不存在{input_path}) chunks [] with open(input_path, r, encodingutf-8) as f: for line_num, line in enumerate(f, 1): try: chunk_data json.loads(line.strip()) chunk Document( page_contentchunk_data[page_content], metadatachunk_data[metadata] ) chunks.append(chunk) except json.JSONDecodeError as e: logger.warning(f第{line_num}行解析失败跳过{str(e)}) logger.info(f从{input_path}加载了{len(chunks)}个分块) return chunks