Context Engineering实战2026:让AI Agent不失忆、不越界、不走偏
什么是Context Engineering2026年“Context Engineering”上下文工程已经成为AI工程师的核心技能之一与Prompt Engineering并列。如果说Prompt Engineering是如何向AI提问那么Context Engineering是如何管理AI知道什么。一个AI Agent在执行任务时它的行为完全由上下文窗口的内容决定。上下文就是AI的工作记忆它决定了AI能做什么、知道什么、记得什么。管理不好上下文AI Agent就会- 忘记之前说过的话失忆- 在不应该的地方使用某些信息越界- 随着对话深入逐渐偏离原始目标走偏本文将从工程角度系统介绍Context Engineering的核心技术以及如何在生产Agent中实践。—## 上下文的五种类型在设计AI Agent时上下文可以分为五类每类都需要不同的管理策略1. 系统上下文System Context - 角色定义、行为规则、能力边界 - 永久存在于每次调用中 2. 任务上下文Task Context - 当前任务的目标、约束、背景信息 - 任务开始时注入任务结束时清除 3. 对话上下文Conversation Context - 历史对话记录 - 需要主动管理压缩、摘要、选择性遗忘 4. 知识上下文Knowledge Context - 从知识库检索的相关信息RAG - 动态注入只在需要时检索 5. 状态上下文State Context - Agent的工作状态已完成的步骤、中间结果 - 需要持久化支持断点续传—## 核心技术1上下文压缩与滚动窗口pythonfrom anthropic import Anthropicfrom dataclasses import dataclass, fieldfrom typing import Optionalimport jsondataclassclass Message: role: str content: str token_estimate: int 0 importance: float 1.0 # 0-1越高越重要压缩时保留 def __post_init__(self): if self.token_estimate 0: self.token_estimate len(self.content.split()) * 1.3dataclassclass ContextWindow: 智能上下文窗口管理器 max_tokens: int 100000 # 上下文窗口大小 compression_threshold: float 0.8 # 超过80%时触发压缩 min_messages_to_keep: int 4 # 至少保留最近N条消息 system_prompt: str messages: list[Message] field(default_factorylist) _client: Anthropic field(default_factoryAnthropic) property def current_tokens(self) - int: 估算当前使用的token数 system_tokens len(self.system_prompt.split()) * 1.3 message_tokens sum(m.token_estimate for m in self.messages) return int(system_tokens message_tokens) property def is_near_limit(self) - bool: return self.current_tokens self.max_tokens * self.compression_threshold def add_message(self, role: str, content: str, importance: float 1.0): 添加消息如果快满了自动触发压缩 msg Message(rolerole, contentcontent, importanceimportance) self.messages.append(msg) if self.is_near_limit: self._compress() def _compress(self): 上下文压缩策略 1. 保留所有高重要性消息 2. 将中间的低重要性消息替换为摘要 3. 始终保留最近N条消息 if len(self.messages) self.min_messages_to_keep: return # 分区要压缩的历史 要保留的最近消息 recent_count self.min_messages_to_keep to_compress self.messages[:-recent_count] recent self.messages[-recent_count:] if not to_compress: return # 生成压缩摘要 conversation_text \n.join([ f{m.role}: {m.content} for m in to_compress ]) response self._client.messages.create( modelclaude-3-haiku-20240307, max_tokens500, messages[{ role: user, content: f请用简洁的摘要概括以下对话中的关键信息、决策和结论{conversation_text}要求1. 保留所有重要的事实、决策和承诺2. 忽略礼貌用语和重复内容3. 摘要长度不超过原文的30%4. 以客观叙述方式写作 }] ) summary response.content[0].text summary_msg Message( rolesystem, contentf[早期对话摘要]\n{summary}, importance1.0, # 摘要本身重要性高 ) # 替换用摘要 最近消息 self.messages [summary_msg] recent print(f [Context] 压缩完成{len(to_compress)recent_count}条 → {len(self.messages)}条) def get_messages_for_api(self) - list[dict]: 转换为API所需的消息格式 return [ {role: m.role, content: m.content} for m in self.messages if m.role in (user, assistant) # 过滤掉system类型的摘要 ]—## 核心技术2分层记忆架构pythonimport timefrom pathlib import Pathdataclassclass Memory: content: str created_at: float field(default_factorytime.time) last_accessed: float field(default_factorytime.time) access_count: int 0 importance: float 0.5 tags: list[str] field(default_factorylist) memory_type: str episodic # episodic, semantic, proceduralclass HierarchicalMemory: 三层记忆架构 L1 - 工作记忆当前对话上下文最快最小 L2 - 情节记忆近期交互历史中速中等大小 L3 - 语义记忆长期知识和事实最慢最大 def __init__(self, user_id: str, storage_path: str ./memory_store): self.user_id user_id self.storage_path Path(storage_path) / user_id self.storage_path.mkdir(parentsTrue, exist_okTrue) # L1: 工作记忆纯内存 self.working_memory: list[Memory] [] self.max_working_memories 10 # L2: 情节记忆内存磁盘持久化 self.episodic_memories: list[Memory] [] self.max_episodic_memories 100 # L3: 语义记忆向量数据库懒加载 self._semantic_store None # 加载持久化记忆 self._load_episodic() def remember(self, content: str, importance: float 0.5, tags: list[str] None): 记录新的记忆根据重要性决定存储层级 memory Memory( contentcontent, importanceimportance, tagstags or [], ) if importance 0.8: # 高重要性 → 直接写入情节记忆并持久化 self.episodic_memories.append(memory) self._save_episodic() else: # 普通重要性 → 先放工作记忆 self.working_memory.append(memory) # 工作记忆满了将重要的转移到情节记忆 if len(self.working_memory) self.max_working_memories: self._consolidate_working_memory() def recall(self, query: str, k: int 5, include_working: bool True) - list[Memory]: 从各层记忆中检索相关记忆 策略先搜情节记忆再搜语义记忆最后合并工作记忆 results [] # 工作记忆全部包含最近的放前面 if include_working: results.extend(reversed(self.working_memory[-5:])) # 情节记忆关键词匹配 query_words set(query.lower().split()) for memory in self.episodic_memories: memory_words set(memory.content.lower().split()) overlap len(query_words memory_words) if overlap 0: memory.last_accessed time.time() memory.access_count 1 results.append(memory) # 按相关性和重要性排序 results.sort(keylambda m: m.importance * (1 m.access_count * 0.1), reverseTrue) return results[:k] def build_memory_context(self, current_task: str) - str: 为当前任务构建记忆上下文 根据任务相关性动态选择注入哪些记忆 relevant_memories self.recall(current_task, k5) if not relevant_memories: return memory_texts [] for m in relevant_memories: age_hours (time.time() - m.created_at) / 3600 age_str f{age_hours:.0f}小时前 if age_hours 24 else f{age_hours/24:.0f}天前 memory_texts.append(f[{age_str}] {m.content}) return 【相关历史记忆】\n \n.join(memory_texts) def _consolidate_working_memory(self): 将工作记忆中重要的转移到情节记忆 important [m for m in self.working_memory if m.importance 0.6] self.episodic_memories.extend(important) # 工作记忆只保留最近的 self.working_memory self.working_memory[-self.max_working_memories//2:] # 情节记忆超限删除旧的低重要性记忆 if len(self.episodic_memories) self.max_episodic_memories: self.episodic_memories.sort(keylambda m: m.importance * (1 m.access_count * 0.1)) self.episodic_memories self.episodic_memories[20:] # 删除最低分的20个 self._save_episodic() def _save_episodic(self): data [ { content: m.content, created_at: m.created_at, importance: m.importance, tags: m.tags, access_count: m.access_count, } for m in self.episodic_memories ] with open(self.storage_path / episodic.json, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) def _load_episodic(self): path self.storage_path / episodic.json if path.exists(): with open(path, encodingutf-8) as f: data json.load(f) self.episodic_memories [ Memory(**item) for item in data ]—## 核心技术3防止上下文污染pythonclass ContextIsolator: 防止上下文越界确保不同任务的上下文不相互污染 特别适用于多任务并发的Agent场景 def __init__(self): self._contexts: dict[str, list] {} self._active_context_id: str | None None def create_context(self, context_id: str, system_prompt: str) - None: 创建隔离的上下文空间 self._contexts[context_id] { system_prompt: system_prompt, messages: [], created_at: time.time(), } def switch_context(self, context_id: str) - None: 切换到指定上下文 if context_id not in self._contexts: raise ValueError(f上下文不存在: {context_id}) self._active_context_id context_id def add_to_active_context(self, role: str, content: str) - None: 向当前活跃上下文添加消息 if not self._active_context_id: raise RuntimeError(没有活跃的上下文) self._contexts[self._active_context_id][messages].append( {role: role, content: content} ) def get_active_context_messages(self) - tuple[str, list]: 获取当前上下文的系统提示和消息 if not self._active_context_id: raise RuntimeError(没有活跃的上下文) ctx self._contexts[self._active_context_id] return ctx[system_prompt], ctx[messages] def merge_contexts(self, target_id: str, source_id: str, summary_only: bool True): 合并两个上下文例如子任务完成后将结果注入主任务 summary_onlyTrue: 只注入摘要不暴露完整历史防止信息越界 source self._contexts.get(source_id) target self._contexts.get(target_id) if not source or not target: raise ValueError(上下文不存在) if summary_only: # 只注入摘要 messages_text \n.join([ f{m[role]}: {m[content]} for m in source[messages] ]) summary f[子任务 {source_id} 完成摘要{messages_text[:300]}...] target[messages].append({role: system, content: summary}) else: # 完整注入 target[messages].extend(source[messages])—## 综合实践Context-Aware Agentpythonclass ContextAwareAgent: 集成所有上下文管理技术的完整Agent def __init__(self, user_id: str, system_prompt: str): self.client Anthropic() self.user_id user_id # 上下文窗口管理 self.context_window ContextWindow( system_promptsystem_prompt, max_tokens120000, ) # 分层记忆 self.memory HierarchicalMemory(user_id) def chat(self, user_message: str) - str: 处理用户消息集成记忆检索 # 1. 检索相关记忆 memory_context self.memory.build_memory_context(user_message) # 2. 构建增强消息注入记忆上下文 enhanced_message user_message if memory_context: enhanced_message f{memory_context}\n\n用户消息{user_message} # 3. 添加到上下文窗口自动管理压缩 self.context_window.add_message(user, enhanced_message) # 4. 调用LLM response self.client.messages.create( modelclaude-3-5-sonnet-20241022, max_tokens2048, systemself.context_window.system_prompt, messagesself.context_window.get_messages_for_api(), ) assistant_message response.content[0].text # 5. 记录回复到上下文 self.context_window.add_message(assistant, assistant_message) # 6. 提取并存储重要信息到记忆 self._extract_and_remember(user_message, assistant_message) return assistant_message def _extract_and_remember(self, user_msg: str, assistant_msg: str): 从对话中提取值得记住的信息 # 检测关键信息用户偏好、决策、事实等 key_indicators [我喜欢, 我不想, 请记住, 我的, 决定了, 选择] has_important_info any(kw in user_msg for kw in key_indicators) if has_important_info: self.memory.remember( contentf用户表达{user_msg[:200]}, importance0.8, tags[用户偏好], )—## 总结Context Engineering的黄金法则1.最小化原则注入的上下文越少越好只放AI真正需要的信息2.层次化存储工作记忆 → 情节记忆 → 语义记忆不同时效的信息放不同层3.主动压缩不要让上下文被动地满溢要主动管理和压缩4.隔离原则不同任务的上下文要隔离防止信息越界和污染5.可溯源性任何注入的信息都要能追溯来源方便调试Context Engineering是让AI Agent在长期运行中保持稳定、可预期的核心基础设施。投资在上下文管理上的工程努力会在Agent的可靠性和用户体验上得到数倍的回报。