用LangGraph和FastAPI手把手搭建一个电商智能客服:从Multi-Agent架构到API部署
用LangGraph和FastAPI构建电商智能客服从Multi-Agent架构到API部署实战电商平台的智能客服系统需要处理海量用户咨询从商品查询到订单跟踪再到售后支持每个环节都需要精准响应。传统规则引擎难以应对复杂多变的用户需求而单一的大模型又容易产生幻觉或偏离业务场景。本文将带你从零开始基于开源项目AssistGen的核心设计用LangGraph构建Multi-Agent系统并通过FastAPI封装成生产级服务。1. 环境准备与项目初始化在开始前确保你的开发环境满足以下要求Python 3.10已安装Poetry或pipenv等依赖管理工具至少16GB内存用于本地模型运行Neo4j 5.x 图数据库服务推荐使用conda创建独立环境conda create -n assistgen python3.10 conda activate assistgen安装核心依赖包pip install langgraph langchain fastapi uvicorn neo4j pygraphviz项目目录结构建议如下assistgen/ ├── agents/ # Agent定义 ├── graphs/ # 图结构定义 ├── tools/ # 自定义工具 ├── schemas/ # Pydantic模型 ├── api/ # FastAPI路由 ├── config.py # 全局配置 └── main.py # 应用入口2. 理解Multi-Agent架构设计AssistGen采用分层Agent架构核心思想是将复杂问题分解为多个专业化Agent协同处理。下图展示了主要组件关系用户请求 → 路由Agent → 专用Agent集群 → 结果聚合 → 响应输出 ↑ ↓ 意图识别 工具调用(知识库/数据库)2.1 关键Agent角色定义每个Agent都是一个独立的处理单元通过LangGraph的节点(Node)实现Agent类型职责实现方式路由Agent分析请求并分发给专业AgentLangGraph条件边商品查询Agent处理产品规格、价格等查询GraphRAG Neo4j查询订单Agent处理订单状态、物流跟踪业务API集成售后Agent处理退换货政策咨询结构化知识库检索安全Agent检查响应合规性规则引擎LLM校验2.2 图状态(State)设计全局状态是Agent间共享的数据容器定义示例from typing import TypedDict, List, Optional from langgraph.graph.message import AnyMessage class AgentState(TypedDict): messages: List[AnyMessage] # 对话历史 current_agent: str # 当前活跃Agent query_type: Optional[str] # 查询类型分类 tool_outputs: dict # 工具调用结果3. 构建核心图结构LangGraph的核心是通过定义节点和边来构建处理流程图。以下是商品查询子图的实现示例3.1 定义节点函数from langgraph.prebuilt import ToolNode from langchain_core.tools import tool tool def query_product_specs(product_name: str): 查询商品详细规格 # 实际实现会连接Neo4j或商品数据库 return f{product_name}的规格参数... product_query_node ToolNode([query_product_specs])3.2 构建条件路由from langgraph.graph import Graph from langgraph.prebuilt import ConditionalEdge def route_query(state: AgentState): if 价格 in state[messages][-1].content: return price_query elif 规格 in state[messages][-1].content: return spec_query return general_query workflow Graph() workflow.add_node(product_query, product_query_node) workflow.add_conditional_edges( route_query, route_query, { price_query: price_agent, spec_query: spec_agent, general_query: general_agent } )3.3 完整图编译workflow.set_entry_point(route_query) graph workflow.compile()4. 集成业务工具链电商客服需要连接多种业务系统以下是典型工具集成方案4.1 Neo4j商品知识图谱配置Neo4j连接并封装查询工具from langchain_community.graphs import Neo4jGraph neo4j Neo4jGraph( urlbolt://localhost:7687, usernameneo4j, passwordpassword ) tool def query_neo4j(cypher: str): 执行Neo4j查询 return neo4j.query(cypher)4.2 GraphRAG混合检索结合向量检索与图谱查询from langchain_community.vectorstores import Chroma from langchain_community.embeddings import OllamaEmbeddings vectorstore Chroma( embedding_functionOllamaEmbeddings(modelnomic-embed-text), persist_directory./chroma_db ) tool def hybrid_search(query: str): 混合检索向量图谱 # 向量相似度检索 vector_results vectorstore.similarity_search(query) # 生成Cypher查询 cypher llm.generate_cypher(query) graph_results neo4j.query(cypher) return {vector: vector_results, graph: graph_results}5. FastAPI服务封装将编译好的图结构暴露为REST API5.1 定义API模型from pydantic import BaseModel class ChatRequest(BaseModel): message: str session_id: str None class ChatResponse(BaseModel): response: str session_id: str sources: list []5.2 核心路由实现from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware app FastAPI(titleAssistGen API) app.add_middleware( CORSMiddleware, allow_origins[*], allow_methods[*], ) app.post(/chat) async def chat_endpoint(request: ChatRequest): state { messages: [HumanMessage(contentrequest.message)], session_id: request.session_id } result await graph.ainvoke(state) return ChatResponse( responseresult[messages][-1].content, session_idrequest.session_id )5.3 启动服务配置if __name__ __main__: import uvicorn uvicorn.run( main:app, host0.0.0.0, port8000, reloadTrue, workers4 )6. 生产环境优化策略6.1 性能调优参数关键配置示例config.pyclass Settings: MAX_CONCURRENT 100 TIMEOUT 30.0 LLM_CACHE_TTL 3600 NEO4J_POOL_SIZE 206.2 监控与日志集成Prometheus监控from prometheus_fastapi_instrumentator import Instrumentator Instrumentator().instrument(app).expose(app)日志配置示例import logging from fastapi.logger import logger logging.basicConfig( format%(asctime)s - %(name)s - %(levelname)s - %(message)s, levellogging.INFO ) logger logging.getLogger(__name__)6.3 安全防护措施JWT认证中间件请求速率限制输出内容过滤from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer security HTTPBearer() async def auth_required(token: str Depends(security)): if not validate_token(token.credentials): raise HTTPException(status_code403)7. 典型问题排查指南7.1 常见错误代码错误码原因解决方案502Agent执行超时检查LLM响应时间/增加超时设置503图数据库连接失败验证Neo4j服务状态504工具调用异常检查工具函数输入输出类型7.2 调试技巧可视化图结构from langgraph.graph import graph_to_dot dot graph_to_dot(graph) dot.render(assistgen_flow)中间状态检查async def debug_node(state): print(fCurrent state: {state}) return state workflow.add_node(debug, debug_node)日志记录配置import logging logging.basicConfig(levellogging.DEBUG)在实际部署中我们发现Neo4j的索引优化能显著提升商品查询性能。通过为高频查询字段创建复合索引平均响应时间从1.2秒降低到400毫秒。另一个关键点是合理设置LangGraph的检查点间隔太频繁会影响性能太少则可能丢失重要中间状态。