AI 开发进阶第2篇AI 系统可观测性——让 Agent 的运行过程可见、可追、可调试适合读者已读完基础9篇 第①篇评估体系想让 Agent 系统可运维、可监控预计阅读时间35分钟作者AI小渔村前言Agent 系统需要仪表盘第①篇我们讨论了如何评估 Agent 的质量、安全和成本。但评估是事后的分析生产环境需要的是实时监控当前有多少请求在处理每个请求走到哪一步了哪个 Agent 正在调用什么工具成本有没有异常飙升用户反馈怎么样没有可观测性ObservabilityAgent 系统就是一个黑盒。这篇讲的是如何给 Agent 系统装上仪表盘和黑匣子让运行过程可见、可追、可调试。一、可观测性的三大支柱在 Agent 系统中经典的可观测性三支柱同样适用┌─────────────────────────────────────────────────┐ │ Agent 可观测性体系 │ ├─────────────────────────────────────────────────┤ │ 1. _logs日志 │ │ → 记录系统运行中的关键事件 │ │ → 问题发生时能看到发生了什么 │ │ │ │ 2. _metrics指标 │ │ → 量化系统的运行状态 │ │ → 实时了解系统好不好 │ │ │ │ 3. _traces链路追踪 │ │ → 记录请求的完整调用路径 │ │ → 问题发生时能看到问题出在哪一步 │ └─────────────────────────────────────────────────┘在 Agent 系统中还需要额外关注Token 使用量成本监控工具调用链路Agent 特有模型推理质量输出监控二、日志体系设计2.1 日志记录的四个级别import logging from enum import Enum from datetime import datetime from typing import Optional, Dict, Any import json class AgentLogLevel(Enum): DEBUG 10 # 详细调试信息 INFO 20 # 一般信息 WARNING 30 # 警告 ERROR 40 # 错误 class AgentLogger: Agent 系统专用日志器 def __init__(self, service_name: str): self.logger logging.getLogger(fagent.{service_name}) self.logger.setLevel(AgentLogLevel.INFO) # 控制台输出 handler logging.StreamHandler() handler.setFormatter(logging.Formatter( %(asctime)s | %(levelname)s | %(name)s | %(message)s )) self.logger.addHandler(handler) def log_request_start(self, request_id: str, user_id: str, task: str): 记录请求开始 self.logger.info(json.dumps({ event: request_start, request_id: request_id, user_id: user_id, task: task[:200], # 截断避免日志过长 timestamp: datetime.utcnow().isoformat() })) def log_agent_thinking(self, request_id: str, thought: str, step: int): 记录 Agent 思考过程 self.logger.debug(json.dumps({ event: agent_thinking, request_id: request_id, step: step, thought: thought[:500], timestamp: datetime.utcnow().isoformat() })) def log_tool_call(self, request_id: str, tool_name: str, params: Dict[str, Any], result: Any, duration_ms: float): 记录工具调用 # 敏感参数脱敏 safe_params self._sanitize_params(params) self.logger.info(json.dumps({ event: tool_call, request_id: request_id, tool_name: tool_name, params: safe_params, success: not isinstance(result, Exception), duration_ms: duration_ms, timestamp: datetime.utcnow().isoformat() })) def log_llm_call(self, request_id: str, model: str, prompt_tokens: int, completion_tokens: int, latency_ms: float, cost_usd: float): 记录 LLM 调用 self.logger.info(json.dumps({ event: llm_call, request_id: request_id, model: model, prompt_tokens: prompt_tokens, completion_tokens: completion_tokens, total_tokens: prompt_tokens completion_tokens, latency_ms: latency_ms, cost_usd: cost_usd, timestamp: datetime.utcnow().isoformat() })) def log_request_end(self, request_id: str, success: bool, error: Optional[str] None, total_cost_usd: float 0): 记录请求结束 self.logger.info(json.dumps({ event: request_end, request_id: request_id, success: success, error: error, total_cost_usd: total_cost_usd, timestamp: datetime.utcnow().isoformat() })) def _sanitize_params(self, params: Dict) - Dict: 参数脱敏 sensitive_keys {api_key, token, password, secret, credential} return { k: ***REDACTED*** if any(s in k.lower() for s in sensitive_keys) else v for k, v in params.items() }2.2 日志最佳实践每个请求必须有 request_id——串联所有日志结构化日志——用 JSON方便后续分析敏感信息必须脱敏——api_key、token 等日志级别要合理——INFO 记录关键事件DEBUG 记录详细过程三、指标监控体系3.1 核心指标定义from dataclasses import dataclass, field from typing import List from collections import defaultdict import time dataclass class AgentMetrics: Agent 系统核心指标 # 请求级指标 request_count: int 0 request_success: int 0 request_failed: int 0 # 延迟指标 latencies: List[float] field(default_factorylist) # 成本指标 total_tokens: int 0 total_cost_usd: float 0 # 工具调用指标 tool_calls: int 0 tool_errors: int 0 # LLM 调用指标 llm_calls: int 0 llm_errors: int 0 def add_request(self, success: bool, latency_ms: float, tokens: int, cost_usd: float): 记录一个请求 self.request_count 1 if success: self.request_success 1 else: self.request_failed 1 self.latencies.append(latency_ms) self.total_tokens tokens self.total_cost_usd cost_usd def add_tool_call(self, success: bool): 记录一次工具调用 self.tool_calls 1 if not success: self.tool_errors 1 def add_llm_call(self, success: bool): 记录一次 LLM 调用 self.llm_calls 1 if not success: self.llm_errors 1 def get_summary(self) - dict: 获取指标汇总 avg_latency sum(self.latencies) / len(self.latencies) if self.latencies else 0 # P50、P95、P99 延迟 sorted_latencies sorted(self.latencies) p50 sorted_latencies[len(sorted_latencies) // 2] if sorted_latencies else 0 p95 sorted_latencies[int(len(sorted_latencies) * 0.95)] if sorted_latencies else 0 p99 sorted_latencies[int(len(sorted_latencies) * 0.99)] if sorted_latencies else 0 return { request: { total: self.request_count, success: self.request_success, failed: self.request_failed, success_rate: self.request_success / max(self.request_count, 1) }, latency_ms: { avg: round(avg_latency, 2), p50: round(p50, 2), p95: round(p95, 2), p99: round(p99, 2) }, cost: { total_usd: round(self.total_cost_usd, 4), avg_per_request: round(self.total_cost_usd / max(self.request_count, 1), 4), total_tokens: self.total_tokens }, tools: { calls: self.tool_calls, errors: self.tool_errors, error_rate: self.tool_errors / max(self.tool_calls, 1) }, llm: { calls: self.llm_calls, errors: self.llm_errors, error_rate: self.llm_errors / max(self.llm_calls, 1) } } class MetricsCollector: 指标收集器支持 Prometheus 格式 def __init__(self): self.metrics AgentMetrics() self.by_model: Dict[str, AgentMetrics] defaultdict(AgentMetrics) self.by_tool: Dict[str, AgentMetrics] defaultdict(AgentMetrics) def record_request(self, model: str, success: bool, latency_ms: float, tokens: int, cost_usd: float): 记录请求指标 self.metrics.add_request(success, latency_ms, tokens, cost_usd) self.by_model[model].add_request(success, latency_ms, tokens, cost_usd) def record_tool_call(self, tool_name: str, success: bool): 记录工具调用指标 self.metrics.add_tool_call(success) self.by_tool[tool_name].add_tool_call(success) def export_prometheus(self) - str: 导出 Prometheus 格式 lines [] # 请求指标 lines.append(fagent_requests_total {self.metrics.request_count}) lines.append(fagent_requests_success_total {self.metrics.request_success}) lines.append(fagent_requests_failed_total {self.metrics.request_failed}) # 成本指标 lines.append(fagent_cost_total_usd {self.metrics.total_cost_usd}) lines.append(fagent_tokens_total {self.metrics.total_tokens}) # 工具指标 lines.append(fagent_tool_calls_total {self.metrics.tool_calls}) lines.append(fagent_tool_errors_total {self.metrics.tool_errors}) return \n.join(lines)3.2 告警规则设计ALERT_RULES [ { name: error_rate_high, condition: request_failed / request_total 0.05, severity: warning, message: 错误率超过 5%, cooldown_minutes: 5 }, { name: latency_p99_high, condition: latency_p99 10000, # 10秒 severity: warning, message: P99 延迟超过 10 秒, cooldown_minutes: 5 }, { name: cost_spike, condition: cost_last_hour cost_avg_hour * 3, severity: critical, message: 成本 hour over hour 增长 3 倍, cooldown_minutes: 15 }, { name: tool_error_rate_high, condition: tool_errors / tool_calls 0.1, severity: warning, message: 工具错误率超过 10%, cooldown_minutes: 5 } ] def check_alerts(metrics: AgentMetrics, previous_metrics: AgentMetrics) - List[dict]: 检查告警 alerts [] # 错误率告警 error_rate metrics.request_failed / max(metrics.request_count, 1) if error_rate 0.05: alerts.append({ rule: error_rate_high, severity: warning, message: f错误率 {error_rate:.1%} 超过 5% }) # 延迟告警 if metrics.latencies: p99 sorted(metrics.latencies)[int(len(metrics.latencies) * 0.99)] if p99 10000: alerts.append({ rule: latency_p99_high, severity: warning, message: fP99 延迟 {p99/1000:.1f}s 超过 10s }) return alerts四、链路追踪实战4.1 为什么需要链路追踪在 Agent 系统中一个请求可能经历用户请求 ↓ [Agent] 思考 → 决定调用工具 ↓ [Tool: 搜索] → 返回结果 ↓ [Agent] 思考 → 决定再调用工具 ↓ [Tool: 数据库] → 返回结果 ↓ [Agent] 生成最终回答这个链条中哪一步慢了哪一步出错了没有链路追踪你只能看到最终失败不知道问题出在哪一步。4.2 用 OpenTelemetry 实现链路追踪from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource # 初始化追踪 trace.set_tracer_provider(TracerProvider( resourceResource.create({ service.name: agent-service, service.version: 2.1.0 }) )) # 导出到 Jaeger / Tempo / 其他 OTLP 兼容后端 trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(OTLPSpanExporter(endpointlocalhost:4317)) ) tracer trace.get_tracer(__name__) class TracingAgent: 带链路追踪的 Agent def __init__(self, agent, logger: AgentLogger): self.agent agent self.logger logger async def run(self, task: str, request_id: str) - str: 执行请求带完整链路追踪 with tracer.start_as_current_span(request_id) as span: # 设置基础属性 span.set_attribute(request.id, request_id) span.set_attribute(task.preview, task[:100]) try: # Step 1: Agent 思考 with tracer.start_as_current_span(agent_think) as think_span: think_span.set_attribute(step, 1) thought await self.agent.think(task) span.add_event(thought_completed, {step: 1}) # Step 2: 工具调用循环 tool_calls [] max_tools 10 for i in range(max_tools): # 决定是否需要调用工具 should_call, tool_name, params await self.agent.decide_tool(thought) if not should_call: break # 追踪工具调用 with tracer.start_as_current_span(ftool.{tool_name}) as tool_span: tool_span.set_attribute(tool.name, tool_name) tool_span.set_attribute(tool.step, i 1) start time.time() try: result await self.agent.call_tool(tool_name, params) tool_span.set_attribute(tool.success, True) tool_calls.append({tool: tool_name, success: True}) except Exception as e: tool_span.set_attribute(tool.success, False) tool_span.set_attribute(tool.error, str(e)) tool_calls.append({tool: tool_name, success: False, error: str(e)}) tool_span.set_attribute(tool.duration_ms, (time.time() - start) * 1000) # 记录到日志 self.logger.log_tool_call(request_id, tool_name, params, result, (time.time() - start) * 1000) # 更新思考上下文 thought f\n[工具返回]: {result} # Step 3: 生成回答 with tracer.start_as_current_span(generate_answer) as gen_span: answer await self.agent.generate(thought) # 标记成功 span.set_attribute(request.success, True) span.set_attribute(tool.call_count, len(tool_calls)) return answer except Exception as e: # 标记失败 span.set_attribute(request.success, False) span.set_attribute(request.error, str(e)) span.record_exception(e) raise4.3 链路可视化链路追踪的数据可以导入Jaeger、Tempo、Zipkin等工具进行可视化每个请求的完整调用链每一步的耗时毫秒级哪里有错误红色高亮工具调用的输入输出五、实战LangSmith / Helicone 使用5.1 LangSmithOpenAI 官方from langsmith import Client # 初始化 client Client() # 创建数据集 dataset client.create_dataset( dataset_nameagent-eval-dataset, descriptionAgent 评估测试集 ) # 添加示例 client.create_examples( dataset_iddataset.id, inputs[ {task: 帮我查一下北京明天天气}, {task: 总结一下这个文档的主要内容} ], outputs[ {expected: 北京明天晴转多云15-25度}, {expected: 文档主要讨论了...} ] ) # 追踪运行 client.traceable def my_agent(task: str): # 你的 Agent 逻辑 ... # 运行评估 experiment client.run_on_dataset( dataset_nameagent-eval-dataset, llm_or_chain_factorymy_agent, evaluation_metrics[correctness, helpfulness] )5.2 Helicone更轻量import openai from helicone import Helicone # 初始化 Helicone Helicone.configure(api_keyyour-helicone-key) # OpenAI 请求自动被追踪 openai.api_base https://oai.helicone.ai/v1 openai.api_key your-openai-key # 请求自动记录到 Helicone 控制台 response openai.ChatCompletion.create( modelgpt-4, messages[{role: user, content: Hello}] )六、总结可观测性建设路线图阶段1日志输出现在 ↓ 用结构化 JSON 日志记录关键事件 阶段2指标收集下周 ↓ 统计请求量、延迟、成本、错误率 阶段3链路追踪下个月 ↓ 用 OpenTelemetry 追踪完整调用链 阶段4告警监控3个月后 ↓ 设置告警规则及时发现问题 阶段5自助分析半年后 → 用日志/指标/追踪数据做根因分析核心思想可观测性是 Agent 系统从 Demo 走向生产的必备能力。没有可观测性Agent 就是一个黑盒——出了问题只能猜。有了可观测性你才能真正看到Agent 在做什么。踩坑经验汇总每个请求必须有 request_id——串联日志、指标、追踪的基石日志要结构化——用 JSON方便后续查询和分析敏感信息必须脱敏——api_key、token 等绝对不能明文记录指标要算 P50/P95/P99——平均值会掩盖问题告警要有冷却时间——避免一条错误刷屏本篇代码https://github.com/dazhuang-zs/run_little_donkey/blob/master/docs/articles/ai-dev-advanced-02-observability.md篇③预告推理加速与成本控制——从 API 到自部署讲 vLLM 部署、量化、KV Cache 复用、模型路由。