langchain高阶语法
1、什么是Retrieverlangchain所有检索器都遵循一个统一抽象接口输入用户问题query输出在该关于的文档列表 list内容的milvus/chroma检索器都集成BaseRetriever把自己写的混合检索、多路召回、Rerank 封装成标准 Retriever三、核心基类继承 BaseRetriever关键三点继承BaseRetriever实现_get_relevant_documents方法内部调用你之前写的向量检索 BM25 Rerank新建langchain_custom/custom_retriever.pyfrom pymilvus import MilvusClient # 连接 Milvus client MilvusClient(urihttp://127.0.0.1:19530) def search_similar(collection_name: str, query_vector: list None, top_k: int 3): 向量检索这里为了兼容自定义检索器query_vector 可以不传内部生成 # 真实项目里你需要把 query 转成 embedding # 这里为了混合检索器能跑先返回模拟数据不影响整体流程 # 你可以 later 替换成真实 embedding 查询 mock_result [ 登录模块支持账号密码、短信验证码两种登录方式, 登录异常包含密码错误、账号锁定、账号不存在场景, 测试用例需要包含前置条件、操作步骤、预期结果 ] return mock_result[:top_k] from rank_bm25 import BM25Okapi import jieba # 构建 BM25 索引 def build_bm25_index(documents: list[str]) - BM25Okapi: tokenized_docs [jieba.lcut(doc) for doc in documents] bm25 BM25Okapi(tokenized_docs) return bm25 # BM25 关键词检索 def bm25_search(bm25: BM25Okapi, query: str, top_k: int 3) - list[str]: query_tokens jieba.lcut(query) top_docs bm25.get_top_n(query_tokens, ntop_k) return top_docs # 合并两路召回 去重 def merge_retrieval_results(vec_docs: list[str], bm25_docs: list[str]) - list[str]: res [] for doc in vec_docs bm25_docs: if doc not in res: res.append(doc) return res from sentence_transformers import CrossEncoder # 加载轻量 Rerank 模型 rerank_model CrossEncoder(BAAI/bge-reranker-base) def rerank_docs(query: str, docs: list[str], top_k: int 3) - list[str]: # 构造 [query, doc] 对 pairs [[query, doc] for doc in docs] scores rerank_model.predict(pairs) # 按分数排序 doc_score list(zip(docs, scores)) doc_score.sort(keylambda x: x[1], reverseTrue) # 取 top_k return [item[0] for item in doc_score[:top_k]] class HybridRerankRetriever(BaseRetriever): #自定义混合检索器向量检索BM25关键词 Rearank重排序 #类变量向量库集合名文档全集 collection_name:str all_docs:str def __init__(self, collection_name:str,all_docs:str): super().__init__() # 调用父类的构造方法 让子类继承父类的所有功能 self.collection_name collection_name self.all_docs all_docs #初始化BM25索引 self.bm25_index build_bm25_index(self.all_docs) def _get_relevant_documents(self, query: str) - List[Document]: #1、向量索引召回 vec_docs search_similar(self.collection_name,query_vector[],top_k4) #2 BM25关键次召回 bm25_docs bm25_search(self.bm25_index,query,top_k4) #3合并去重 merge_docs merge_retrieval_results(vec_docs,bm25_docs) #4 reranl重排序精选Top3 final_docs rerank_docs(query,merge_docs,top_k3) #转成langChain标准Document对象返回 return [Document(page_contenttext) for text in final]_get_relevant_documents是必须实现的抽象方法最后把文本包装成Document符合 LangChain 规范测试demomock_all_docs [ 登录模块支持账号密码、短信验证码两种登录方式, 登录异常包含密码错误、账号锁定、账号不存在场景, 测试用例包含前置条件、操作步骤、预期结果, RAG系统由文档解析、切块、向量化、检索、生成组成 ] #1.初始化自定义检索器 retriever HybridRerankRetriever( collection_namelangchain_rag_kb, all_docsmock_all_docs, ) #2.初始化大模型 llm get_rag_llm() #3.塞入官方的RetrievalQA链 直接使用 #自动帮你做RAG问答的官方成品链 RetrievalQA 检索 拼接 问答 一条龙 rag_qa_chain RetrievalQA.from_chain_type( llmllm, chain_typestuff, ## 文档拼接方式 #stuff把检索到的所有文档直接全部塞进 prompt #map_reduce拆分处理 把每个文档单独问llm 得到小答案最终合并成最终答案 # 文档 1 → 提炼要点文档 2 → 提炼要点 → 合并所有要点 → 生成最终回答 能处理海量文档 # refine 读一个文档 → 生成一次答案 → 再读下一个 → 优化答案 → 不断迭代精调 # 答案 1 → 文档 2 → 优化成答案 2 答案 2 → 文档 3 → 优化成答案 3 最终输出最精准答案 retrieverretriever, return_source_documentsTrue ## 是否返回参考了哪些原文 ) res rag_qa_chain.invoke({query:登录有哪些异常场景}) print(AI回答, res[result]) print(溯源文档: ,[d.page_content for d in res[source_documents]])