前言你的AI应用上线了用户在用但你真的知道它在做什么吗传统应用的可观测性体系Metrics/Logs/Traces在LLM应用上遭遇了全新挑战一个LLM调用可能耗时3-30秒成本从0.01美元到1美元不等而正确性这个指标根本无法用日志里的状态码来衡量。本文系统介绍2026年AI应用可观测性工程的完整体系从Tracing到评估从成本监控到质量告警构建真正可运营的LLM监控平台。—## 一、AI可观测性的特殊挑战### 1.1 与传统可观测性的根本差异| 维度 | 传统应用 | LLM应用 ||-----|---------|---------|| 延迟 | ms级P99 1s | 秒级P99 可能30s || 成本 | 固定基础设施成本 | 按Token计费高度可变 || 错误定义 | HTTP状态码/异常 | 语义错误幻觉、格式错误、逻辑谬误 || 质量评估 | 精确的成功/失败 | 模糊的好/差需要人工或LLM评估 || 调试 | 确定性可复现 | 非确定性相同输入可能不同输出 |### 1.2 核心可观测性需求LLM应用的可观测性需要覆盖四个层次text┌─────────────────────────────────────────┐│ 业务层任务成功率、用户满意度 │├─────────────────────────────────────────┤│ 质量层输出正确性、幻觉率、格式合规率 │├─────────────────────────────────────────┤│ 成本层Token消耗、API费用、缓存命中率 │├─────────────────────────────────────────┤│ 基础层延迟、错误率、并发、重试次数 │└─────────────────────────────────────────┘text—## 二、追踪体系从单次调用到完整链路### 2.1 OpenTelemetry for LLMOpenTelemetry已成为LLM追踪的事实标准GenAI语义约定定义了标准属性pythonfrom opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter# 初始化追踪provider TracerProvider()exporter OTLPSpanExporter(endpointhttp://localhost:4317)provider.add_span_processor(BatchSpanProcessor(exporter))trace.set_tracer_provider(provider)tracer trace.get_tracer(llm-app)def traced_llm_call(prompt: str, model: str gpt-4o): with tracer.start_as_current_span(llm.chat) as span: # 遵循GenAI语义约定 span.set_attribute(gen_ai.system, openai) span.set_attribute(gen_ai.operation.name, chat) span.set_attribute(gen_ai.request.model, model) span.set_attribute(gen_ai.request.temperature, 0.7) # 记录输入注意PII处理 span.set_attribute(gen_ai.prompt, prompt[:1000]) # 截断避免过大 import openai client openai.OpenAI() response client.chat.completions.create( modelmodel, messages[{role: user, content: prompt}] ) # 记录输出和使用统计 usage response.usage span.set_attribute(gen_ai.usage.input_tokens, usage.prompt_tokens) span.set_attribute(gen_ai.usage.output_tokens, usage.completion_tokens) span.set_attribute(gen_ai.response.model, response.model) content response.choices[0].message.content span.set_attribute(gen_ai.completion, content[:1000]) return contenttext### 2.2 LangFuse集成专用LLM追踪平台LangFuse是目前最成熟的开源LLM可观测性平台支持Trace/Span/Score完整体系pythonfrom langfuse import Langfusefrom langfuse.decorators import observe, langfuse_contextlangfuse Langfuse( public_keyyour-public-key, secret_keyyour-secret-key, hosthttp://localhost:3000 # 自部署)observe(namerag-pipeline)def rag_answer(question: str) - str: 完整RAG流程追踪示例 # 标注输入 langfuse_context.update_current_observation( input{question: question}, metadata{pipeline_version: v2.1} ) # 检索阶段 with langfuse_context.observe_span(nameretrieval): chunks retrieve_documents(question) langfuse_context.update_current_observation( output{num_chunks: len(chunks)}, metadata{retriever: densesparse-hybrid} ) # 生成阶段 with langfuse_context.observe_span(namegeneration): prompt build_prompt(question, chunks) answer call_llm(prompt) langfuse_context.update_current_observation( inputprompt, outputanswer ) # 更新顶层Trace输出 langfuse_context.update_current_trace( output{answer: answer}, tags[rag, production] ) return answer# 追加评估分数def add_evaluation_score(trace_id: str, score: float, comment: str): langfuse.score( trace_idtrace_id, namerelevance, valuescore, commentcomment )text### 2.3 多Agent链路追踪对于多Agent系统需要追踪跨Agent的调用链pythonimport uuidfrom contextvars import ContextVarfrom dataclasses import dataclass, field# 追踪上下文传播current_trace_id: ContextVar[str] ContextVar(trace_id, default)current_span_id: ContextVar[str] ContextVar(span_id, default)dataclassclass AgentSpan: trace_id: str span_id: str parent_span_id: str agent_name: str input: dict output: dict field(default_factorydict) start_time: float field(default_factorylambda: __import__(time).time()) end_time: float None def finish(self, output: dict): import time self.end_time time.time() self.output output self.duration_ms (self.end_time - self.start_time) * 1000class AgentTracer: def __init__(self, trace_id: str None): self.trace_id trace_id or str(uuid.uuid4()) self.spans [] def start_span(self, agent_name: str, input_data: dict) - AgentSpan: span AgentSpan( trace_idself.trace_id, span_idstr(uuid.uuid4()), parent_span_idcurrent_span_id.get(), agent_nameagent_name, inputinput_data ) self.spans.append(span) current_span_id.set(span.span_id) return span def export(self) - dict: return { trace_id: self.trace_id, spans: [ { agent: s.agent_name, duration_ms: s.duration_ms if s.end_time else None, input_preview: str(s.input)[:200], output_preview: str(s.output)[:200], } for s in self.spans ] }text—## 三、评估体系衡量输出质量### 3.1 自动化评估框架LLM输出质量评估是最难的部分常用的自动化方案pythonfrom openai import OpenAIfrom pydantic import BaseModelclient OpenAI()class EvaluationResult(BaseModel): score: float # 0-1 reasoning: str issues: list[str]def llm_judge_relevance(question: str, answer: str, context: str None) - EvaluationResult: 用LLM作为Judge评估回答相关性 eval_prompt f你是一个专业的AI输出质量评估专家。 请评估以下AI回答的质量问题{question}回答{answer}{参考上下文 context if context else }评估维度1. 相关性0-1回答是否切题2. 准确性0-1是否存在明显错误或幻觉3. 完整性0-1是否回答了核心问题请给出0-1的综合评分以及具体问题列表。 response client.beta.chat.completions.parse( modelgpt-4o-mini, messages[{role: user, content: eval_prompt}], response_formatEvaluationResult ) return response.choices[0].message.parseddef batch_evaluate(qa_pairs: list[dict], sample_rate: float 0.1) - dict: 批量评估支持采样率控制成本 import random sampled random.sample(qa_pairs, max(1, int(len(qa_pairs) * sample_rate))) results [] for pair in sampled: result llm_judge_relevance( questionpair[question], answerpair[answer], contextpair.get(context) ) results.append(result) avg_score sum(r.score for r in results) / len(results) issues_summary {} for r in results: for issue in r.issues: issues_summary[issue] issues_summary.get(issue, 0) 1 return { sample_size: len(sampled), avg_quality_score: avg_score, top_issues: sorted(issues_summary.items(), keylambda x: x[1], reverseTrue)[:5] }text### 3.2 RAG专项评估RAG系统需要额外评估检索质量pythondef evaluate_rag_faithfulness(question: str, answer: str, retrieved_chunks: list[str]) - dict: 评估RAG答案忠实度答案是否基于检索内容 context \n\n.join(retrieved_chunks) prompt f判断以下AI回答是否完全基于给定的参考文档没有编造信息。参考文档{context[:3000]}AI回答{answer}问题这个回答中的每个声明都能在参考文档中找到支撑吗回答格式- faithful: true/false- unsupported_claims: [列出没有文档支撑的声明]- score: 0-1的忠实度分数 result client.chat.completions.create( modelgpt-4o-mini, messages[{role: user, content: prompt}], temperature0 ) # 解析结果简化版 content result.choices[0].message.content faithful true in content.lower() and false not in content.split(faithful)[1][:20].lower() return { faithful: faithful, details: content, input_tokens: result.usage.prompt_tokens, output_tokens: result.usage.completion_tokens }text—## 四、成本监控让每分钱都透明### 4.1 Token成本追踪pythonfrom dataclasses import dataclassfrom datetime import datetimeimport threading# 各模型定价美元/1M tokens仅示例MODEL_PRICING { gpt-4o: {input: 2.5, output: 10.0}, gpt-4o-mini: {input: 0.15, output: 0.6}, claude-3-5-sonnet: {input: 3.0, output: 15.0}, deepseek-v3: {input: 0.27, output: 1.1},}dataclassclass CostRecord: timestamp: datetime model: str input_tokens: int output_tokens: int cost_usd: float trace_id: str user_id: str feature: str # 哪个功能产生的费用class CostTracker: def __init__(self): self._records [] self._lock threading.Lock() def record(self, model: str, input_tokens: int, output_tokens: int, trace_id: str , user_id: str , feature: str ): pricing MODEL_PRICING.get(model, {input: 0, output: 0}) cost (input_tokens / 1_000_000 * pricing[input] output_tokens / 1_000_000 * pricing[output]) record CostRecord( timestampdatetime.now(), modelmodel, input_tokensinput_tokens, output_tokensoutput_tokens, cost_usdcost, trace_idtrace_id, user_iduser_id, featurefeature ) with self._lock: self._records.append(record) return cost def get_daily_summary(self, date: str None) - dict: from collections import defaultdict records self._records if date: records [r for r in records if r.timestamp.strftime(%Y-%m-%d) date] by_model defaultdict(lambda: {cost: 0, calls: 0, tokens: 0}) by_feature defaultdict(lambda: {cost: 0, calls: 0}) for r in records: by_model[r.model][cost] r.cost_usd by_model[r.model][calls] 1 by_model[r.model][tokens] r.input_tokens r.output_tokens by_feature[r.feature][cost] r.cost_usd by_feature[r.feature][calls] 1 total_cost sum(r.cost_usd for r in records) return { total_cost_usd: round(total_cost, 4), total_calls: len(records), by_model: dict(by_model), by_feature: dict(by_feature), avg_cost_per_call: round(total_cost / max(len(records), 1), 6) }# 全局实例cost_tracker CostTracker()text### 4.2 成本告警pythonclass CostAlert: def __init__(self, daily_budget_usd: float, alert_threshold: float 0.8): self.daily_budget daily_budget_usd self.alert_threshold alert_threshold self._alerted_today False def check_and_alert(self, current_spend: float): ratio current_spend / self.daily_budget if ratio self.alert_threshold and not self._alerted_today: self._alerted_today True self._send_alert( f⚠️ LLM成本告警今日已消费 ${current_spend:.2f} f达到预算 ${self.daily_budget:.2f} 的 {ratio*100:.0f}% ) if ratio 1.0: self._send_alert( f LLM成本超限今日已消费 ${current_spend:.2f} f超过预算 ${self.daily_budget:.2f} ) def _send_alert(self, message: str): # 发送到监控系统/企业微信/Slack等 print(f[ALERT] {message}) # 实际应替换为你的告警通道text—## 五、Prompt版本管理与回归测试### 5.1 Prompt版本控制pythonimport hashlibimport jsonfrom pathlib import Pathclass PromptRegistry: Prompt版本注册表 def __init__(self, store_path: str ./prompts): self.store Path(store_path) self.store.mkdir(exist_okTrue) def register(self, name: str, template: str, metadata: dict None) - str: 注册新版本Prompt返回版本hash version hashlib.md5(template.encode()).hexdigest()[:8] prompt_data { name: name, version: version, template: template, metadata: metadata or {}, created_at: datetime.now().isoformat() } version_file self.store / f{name}_{version}.json version_file.write_text(json.dumps(prompt_data, ensure_asciiFalse, indent2)) # 更新latest链接 latest_file self.store / f{name}_latest.json latest_file.write_text(json.dumps(prompt_data, ensure_asciiFalse, indent2)) return version def get(self, name: str, version: str latest) - str: if version latest: prompt_file self.store / f{name}_latest.json else: prompt_file self.store / f{name}_{version}.json if not prompt_file.exists(): raise ValueError(fPrompt {name}{version} not found) data json.loads(prompt_file.read_text()) return data[template] def list_versions(self, name: str) - list[str]: files self.store.glob(f{name}_*.json) return [f.stem.split(_)[-1] for f in files if latest not in f.stem]text### 5.2 自动化回归测试pythonclass PromptRegressionTest: Prompt变更回归测试 def __init__(self, test_cases_path: str): with open(test_cases_path) as f: self.test_cases json.load(f) def run(self, new_prompt_fn, baseline_prompt_fn, threshold: float 0.9) - dict: 比较新旧Prompt版本质量 new_scores [] baseline_scores [] regressions [] for case in self.test_cases: new_answer new_prompt_fn(case[input]) baseline_answer baseline_prompt_fn(case[input]) new_eval llm_judge_relevance(case[input], new_answer) baseline_eval llm_judge_relevance(case[input], baseline_answer) new_scores.append(new_eval.score) baseline_scores.append(baseline_eval.score) if new_eval.score baseline_eval.score - 0.1: # 下降超过10分 regressions.append({ case: case[input][:100], baseline_score: baseline_eval.score, new_score: new_eval.score, delta: new_eval.score - baseline_eval.score }) avg_new sum(new_scores) / len(new_scores) avg_baseline sum(baseline_scores) / len(baseline_scores) passed avg_new avg_baseline * threshold return { passed: passed, new_avg_score: round(avg_new, 3), baseline_avg_score: round(avg_baseline, 3), delta: round(avg_new - avg_baseline, 3), regression_count: len(regressions), regressions: regressions }text—## 六、推荐工具栈| 层次 | 推荐工具 | 替代选项 ||-----|---------|---------|| 链路追踪 | LangFuse开源自部署 | Langsmith、Arize || 基础监控 | Prometheus Grafana | Datadog、NewRelic || 日志 | Loki Grafana | ELK Stack || 告警 | AlertManager | PagerDuty || 评估 | 自建LLM-Judge | Ragas、DeepEval || Prompt管理 | Git 自建Registry | PromptLayer || 成本追踪 | 自建如上代码 | OpenMeter |—## 总结AI应用可观测性不是锦上添花而是生产运营的基础设施。没有可观测性你对自己应用的了解程度不会超过用户的投诉单。核心原则先追踪再评估最后优化。追踪是数据基础评估是质量感知成本监控是商业约束。三者结合才能让LLM应用真正进入工程化运营阶段。从今天开始为每一个LLM调用加上Trace这是迈向可运营AI应用的第一步。