我第一次听到两个工程师争论 RAG 和 MCP 的时候还真以为他们在讨论宝可梦的进化。其中一个不停地坚持说“它们本质上是一样的。”另一个则自信地点头好像刚解决了分布式系统的问题。与此同时他们开发的生产Agent却像个糊涂蛋一样呆在角落里它觉得自己能背诵公司里所有的政策文件却查不出五分钟前哪个流水线出了问题。真是经典。这时我才恍然大悟大多数开发者把 RAG 和 MCP 当作可以互换的流行语而实际上它们解决的是完全不同的问题。RAG 是你的智能体的记忆它记住了你组织曾经编写过的所有内容。MCP 是它的双手它负责在现实世界中实际操作。如果把它们混在一起你最终会得到一个书本知识丰富但在实际系统中完全无用的 AI。这就像雇佣了一个能背诵你整个维基百科页面的人但当你让他打开浏览器标签页时他却会立刻卡壳。本文旨在解决这个问题。读完本文后你不仅会熟悉相关概念还会掌握相应的思维模型、理论知识以及可运行的代码示例从而真正理解并运用到生产环境中。简单的区分RAG 阅读 → 从你的LLM培训中未涉及的文档中提取知识MCP 执行 → 对实时系统和实时数据执行操作记住这一点。其他一切都由此而来。第一部分RAG——赋予你的LLM长期记忆什么是 RAG检索增强生成 (RAG) 是一种模式其中您可以使用在查询时从您控制的知识库中检索的相关内容来增强 LLM 的提示。LLM的学习进度停滞在培训截止日期。他们对以下方面一无所知您的内部文档上个季度的架构决策团队的运行手册或服务级别协议任何在截止日期之后写的东西RAG算法解决了这个问题。它预先将文档索引到向量存储库中当用户提出问题时系统会提取语义最相关的文本块并将其注入到LLM的上下文窗口中。这样模型无需重新训练即可获得“即时知识”。实际上这意味着如果有人问“我们的管道重试服务级别协议 (SLA) 是什么”模型不会猜测而是会检索准确的策略文本并用基于事实的答案来回答。RAG 是 AI 代理的记忆层。它告诉模型你的组织知道什么而不是它应该做什么。RAG 流程何时使用 RAG当你的知识是静态的或半静态的时RAG 是完美的——例如文档、操作手册、合同、白皮书、入职指南。第二部分MCP——运用您的LLM技能什么是MCP模型上下文协议 (MCP) 是一个开放标准由 Anthropic 引入它定义了 LLM调用外部工具、API 和实时系统的结构化方式。把它想象成带有形式化规范的函数调用。它不会像法学硕士那样凭空臆想出关于实时数据的答案而是会说我需要查一下。让我调用一下合适的工具。MCP 工具可以查询实时数据库触发 Databricks 工作流从Kafka主题中阅读调用 REST API将记录写入记录系统换句话说MCP 将你的 LLM 从只会空谈的模型转变为会行动的模型。它使模型能够通过类型化的模式、防护措施和可预测的行为对真实系统进行受控访问。例如如果用户问“昨晚的管道运行失败了吗”模型不会猜测而是调用你的get_pipeline_status工具获取实时结果并返回真实数据。MCP是AI代理的行动层。它告诉模型系统当前正在做什么而不是文档中描述的是什么。MCP流程以下是扩展后的“何时使用 MCP”表格以可视化决策图的形式呈现第三部分真正的力量——两者结合使用真实用户的问题并不简单。他们常常把知识和操作混杂在一句话里“我们针对管道故障的服务水平协议 (SLA) 是什么昨晚的销售数据导入作业是否违反了该协议”这个问题包含两个部分“我们的服务级别协议 (SLA) 是什么” → 从静态运行手册中检索 →RAG“昨晚是否发生违规” → 查询实时作业监控 API →MCP纯粹的 RAG 系统无法回答第二个问题。它无法访问实时系统。纯粹的 MCP 系统除非您将其硬编码到系统中否则无法了解您的 SLA。换句话说纯粹的 MCP 系统无法回答第一个问题除非您将每项策略都硬编码到工具中否则它没有长期记忆功能。它们结合起来既能赋予您的代理记忆功能又能赋予其自主权。→ RAG 告诉模型你的组织了解哪些信息。→ MCP 告诉模型你的系统当前正在执行哪些操作。→ 现代 AI 代理需要这两者才能在实际工作流程中智能地运行。第四部分代码——自己动手构建项目数据管道支持代理一个模块化代理可以回答有关管道策略RAG的问题并检查实时作业运行状态MCP 风格的工具调用。无需 Databricks 或 Airflow——所有实时数据均为模拟数据。项目结构rag_vs_mcp_demo/ ├── agent/ │ ├── llm_router.py # 基于关键词的 RAG/MCP/混合分类器 │ ├── openai_agent.py # 主代理RAG OpenAI 工具调用 │ ├── openai_client.py # OpenAI 客户端 工具模式 │ └── orchestrator.py # 编排器无需 LLM ├── rag/ │ ├── embedder.py # HuggingFace 嵌入封装 │ ├── retriever.py # 数据块 → 嵌入 → FAISS → 检索 │ └── vector_store.py # FAISS 封装 ├── tools/ │ ├── fake_api.py # 模拟延迟 故障 │ ├── pipeline_status.py # 模拟实时管道状态数据 │ ├── router.py # 工具调度器 │ └── row_count.py # 模拟行计数工具 ├── data/ │ ├── runbook_sla.md │ ├── dataquality_standards.md │ └── oncall_responsibilities.md ├── examples/ │ └── demo_query.py ├── .env.example └── requirements.txt设置# 1. 创建并激活虚拟环境 python -m venv venv source venv/bin/activate # macOS/Linux # venv\Scripts\activate # Windows # 2. 安装依赖项 pip install -r requirements.txt # 3. 设置您的 OpenAI API 密钥 cp .env.example .env # 打开 .env 文件并粘贴您的密钥OPENAI_API_KEYsk-... 首次运行会下载all-MiniLM-L6-v2嵌入模型约80MB。之后会进行缓存。 其中requirements.txt包括 requirements.txt: openai 1.30.0 anthropic 0.25.0 langchain 0.2.0 langchain-community 0.2.0 faiss-cpu 1.7.4 sentence-transformers 2.7.0 python-dotenv 1.0.0步骤 1 — 将您的政策文件存储为单独的文件该存储库不会将策略文本硬编码到脚本.md中而是将知识存储在专用文件中data/。这样就可以轻松添加新文档而无需修改代码——只需将新.md文件放入文件夹它就会在启动时自动加载。数据/├── runbook_sla.md # 管道 SLA 阈值和重试次数上限 ├── dataquality_standards.md # 空值率、重复值和模式漂移规则 └── oncall_responsibilities.md # 确认窗口和 RCA 要求例子 - data/runbook_sla.md# 管道服务级别协议 (SLA) 策略所有关键数据摄取管道必须在4小时内完成。如果管道运行时间超过4小时则必须自动触发事件。失败率阈值总记录数的2%。重试次数上限为3次超过 3次将触发事件升级。步骤 2 — 构建 RAG 流水线模块化RAG 逻辑分为三个文件embedder.py封装 HuggingFace 模型、vector_store.py封装 FAISS 并将retriever.py它们连接在一起。rag/retriever.py— 端到端的 RAG 流水线rag/retriever.py— 端到端的 RAG 流水线 from langchain.text_splitter import RecursiveCharacterTextSplitter from .embedder import Embedder from .vector_store import VectorStore class RAGRetriever : 端到端 RAG 管道 - 分割文档 - 嵌入块 - 存储在 FAISS 中 - 检索前 k 个块 def __init__ ( self, chunk_size 300 , chunk_overlap 30 ): self.splitter RecursiveCharacterTextSplitter( chunk_sizechunk_size, chunk_overlapchunk_overlap ) self.embedder Embedder() self.vectorstore VectorStore(self.embedder.model) def index ( self, raw_text: str ): 分割 嵌入 存储。 chunks self.splitter.create_documents([raw_text]) self.vectorstore.build(chunks) def retrieve ( self, query: str ) - str : 返回前 k 个内容块以单个字符串的形式返回。 docs self.vectorstore.search(query, k 3 ) return \n\n .join([doc.page_content for doc in docs])步骤 3 — 定义 MCP 工具实时作业监控器LIVE_JOB_RUNS { sales_ingestion : { run_id : run_20240407_001 , status : FAILED , duration_minutes : 312 , failure_reason : 超时耗时5小时12分钟 , records_processed : 1_840_000 , failure_rate_pct : 0.8 , retries : 3 , last_run : 2024-04-07 02:15:00 }, inventory_sync : { run_id : run_20240407_002 , status : SUCCESS , duration_minutes : 87 , failure_rate_pct : 0.1 , records_processed : 540_000 , retries : 0 , last_run : 2024-04-07 03:42:00 }, customer_360 : { run_id : run_20240407_003 , status : RUNNING , duration_minutes : 220 , failure_rate_pct : None , records_processed : 920_000 , retries : 1 , last_run : 2024-04-07 01:00:00 } } def get_pipeline_run_status ( pipeline_name: str ) - dict : 伪 MCP 工具返回实时管道状态。 在生产环境中请替换为 Databricks / Airflow / REST API。 key pipeline_name.lower().replace( , _ ) return LIVE_JOB_RUNS.get(key, { error : f未找到管道{pipeline_name} })步骤 4 — LLM 路由器agent/llm_router.py— 一个轻量级的关键词分类器用于判断查询需要 RAG、MCP 还是两者都需要。这样就避免了为了路由意图而额外调用 LLM。class LLMRouter : 决定查询是否需要 - RAG知识检索 - MCP实时工具调用 - HYBRID两者兼有 def classify ( self, query: str ) - str : q query.lower() mcp_keywords [ status , failed , running , breach , row count , records , live , today , last night , trigger , restart , job ] rag_keywords [ what is , explain , policy , runbook , sla , documentation , standard , definition ] needs_mcp any (k in q for k in mcp_keywords) needs_rag any (k in q for k in mcp_keywords)在rag_keywords 中 if needs_mcp and needs_rag return “hybrid” if needs_mcp return “mcp” if needs_rag return “rag” return “unknown”步骤 5 — 构建代理RAG MCP 结合使用import json import os import glob from agent.openai_client import client, mcp_tools from tools.pipeline_status import get_pipeline_run_status from rag import RAGRetriever # ── 在模块加载时初始化 RAG 一次 ────────────────────────── def _load_data_docs () - str : data_dir os.path.join(os.path.dirname(__file__), . , data ) md_files glob.glob(os.path.join(data_dir, *.md )) if not md_files: raise FileNotFoundError( f在{data_dir}中未找到 .md 文件 ) combined [] for path in sorted (md_files): with open (path, r ) as f: combined.append(f.read()) print ( f[RAG] 已加载{ len (md_files)} doc(s) from data/ ) return \n\n .join(combined) _rag RAGRetriever() _rag.index(_load_data_docs()) # ── Agent ────────────────────────────────────────────────────────── def pipeline_support_agent ( user_query: str ): print ( f\n { * 60 } ) print ( fUSER QUERY: {user_query} ) print ( f { * 60 } \n ) # Step 1: RAG — retrieve relevant policy context policy_context _rag.retrieve(user_query) print ( f[RAG] Retrieved policy context:\n {policy_context} \n ) print ( - * 40 ) system_prompt You作为一名数据工程支持人员 您需要具备以下两项能力1. 了解内部管道策略具体内容见下文。2 . 能够调用实时监控工具来查看实际的管道运行数据。 规则 - 如果问题需要策略/SLA 方面的知识请使用提供的上下文。 - 如果需要实时管道状态调用 get_pipeline_run_status 工具。- 如果需要两者兼有同时执行这两项操作然后综合得出清晰的答案。- 务必具体说明。如果违反了服务级别协议 (SLA)请用数字明确说明。 messages [ { role : system , content : system_prompt}, { role : user , content : ( f策略和运行手册上下文来自内部文档\n f---\n {policy_context} \n---\n\n f用户问题{user_query} \n\n f回答此问题。如果您需要实时管道数据请使用此工具。 ) } ] # 步骤 2首次 LLM 调用 — 可能会触发工具调用 response client.chat.completions.create( model gpt-4.1 , messagesmessages, toolsmcp_tools, tool_choice auto ) message response.choices[ 0 ].message # 步骤 3处理触发的工具调用 if message.tool_calls: tool_call message.tool_calls[ 0 ] args json.loads(tool_call.function.arguments) pipeline_name args[ pipeline_name ] print ( f[MCP] 工具调用{tool_call.function.name} ) print ( f[MCP] 管道{pipeline_name} ) tool_result get_pipeline_run_status(pipeline_name) print ( f[MCP] 实时结果{json.dumps(tool_result, indent 2 )} \n ) print ( - * 40 ) # 将工具结果反馈到最终合成 messages.append(message) messages.append({ role : tool , tool_call_id : tool_call. id , content : json.dumps(tool_result) }) final client.chat.completions.create( model “gpt-4.1” messagesmessages ) print ( f[AGENT ANSWER]\n {final.choices[ 0 ].message.content} )返回# 纯 RAG 答案 — 无需工具 print ( f[代理答案]\n {message.content} )第六步——运行它始终从项目根目录运行不要从项目内部运行examples/cd rag_vs_mcp_demo python -m examples.demo_query examples/demo_query.py from agent.openai_agent import pipeline_support_agent # 查询 1纯 RAG — 仅策略问题 pipeline_support_agent( 我们的管道故障 SLA 是多少 ) # 查询 2纯 MCP — 仅实时数据 pipeline_support_agent( 昨晚 sales_ingestion 作业失败了吗 ) # 查询 3混合 — RAG MCP 结合 pipeline_support_agent( 我们的 SLA 是多少sales_ingestion 是否违反了 SLA ) # 查询 4MCP 行数统计工具 pipeline_support_agent( 请提供 customers 表的行数。 )示例输出混合查询用户查询我们的服务级别协议 (SLA) 是什么销售数据摄取是否违反了该协议 [RAG] 从 data/ 加载了 3 个文档[RAG] 检索到策略上下文所有关键数据摄取管道必须在 4 小时内完成。如果管道运行时间超过 4 小时则必须自动触发事件。管道重试次数上限为 3 次超过 3 次将触发事件升级。[MCP] 工具调用get_pipeline_run_status [MCP] 管道sales_ingestion [MCP] 实时结果{ status: FAILED, duration_minutes: 312, failure_reason: 超时5 小时 12 分钟, retries: 3 } [客服回复]是的——sales_ingestion 管道昨晚违反了您的 SLA。服务级别协议 (SLA) 违规- 策略要求在 4 小时240 分钟内完成- 昨晚的运行耗时 312 分钟5 小时 12 分钟- 这违反了 72 分钟的 SLA足以触发自动事件。重试升级- 管道已达到最大重试次数上限3 次- 根据您的策略这将触发升级 - 不应再次自动重试。所需操作根据您的运行手册1. 如果尚未创建事件单请立即创建2. 值班工程师必须在 15 分钟内确认3. 必须在问题解决后 24 小时内提交根本原因分析。第五部分扩展存储库要添加实际的管道请将LIVE_JOB_RUNS其替换tools/pipeline_status.py为对 Databricks、Airflow 或您自己的监控系统的实时 API 调用。要添加更多知识文档只需将任何.md文件拖放到指定位置即可data/——它们会在启动时自动加载无需任何代码更改。第六部分心智模型——牢记于心快速区别结语RAG 让你的智能体掌握信息MCP 让你的智能体具备能力。但两者单独来看都不足以满足生产级 AI 系统的需求。在企业的数据工程、运营、客户支持等领域真正重要的代理是那些能够同时回忆起企业知识并操作实时系统的人。学习资源推荐如果你想更深入地学习大模型以下是一些非常有价值的学习资源这些资源将帮助你从不同角度学习大模型提升你的实践能力。一、全套AGI大模型学习路线AI大模型时代的学习之旅从基础到前沿掌握人工智能的核心技能​因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获取二、640套AI大模型报告合集这套包含640份报告的合集涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师还是对AI大模型感兴趣的爱好者这套报告合集都将为您提供宝贵的信息和启示​因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获取三、AI大模型经典PDF籍随着人工智能技术的飞速发展AI大模型已经成为了当今科技领域的一大热点。这些大型预训练模型如GPT-3、BERT、XLNet等以其强大的语言理解和生成能力正在改变我们对人工智能的认识。 那以下这些PDF籍就是非常不错的学习资源。因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获取四、AI大模型商业化落地方案作为普通人入局大模型时代需要持续学习和实践不断提高自己的技能和认知水平同时也需要有责任感和伦理意识为人工智能的健康发展贡献力量。