AI应用的可观测性建设从日志到链路追踪前言在大厂时我们有专门的可观测性平台日志、监控、链路追踪我以为这是大公司的专利。创业后第一次遇到线上问题时我们只有打印的日志查了3个小时才定位到问题。那次之后我意识到可观测性不是奢侈品而是必需品。一个没有可观测性的系统就像一个没有仪表盘的飞机出了问题都不知道在哪。今天分享我们是如何从零开始搭建 AI 应用的可观测性体系的。一、可观测性的三大支柱1.1 可观测性 vs 监控维度传统监控可观测性思维模式预设问题针对检查未知问题自由探索关注点指标告警数据关联分析调试方式告警 - 排查数据 - 探索 - 根因数据要求结构化指标多维度原始数据1.2 三大支柱可观测性 日志Logs 指标Metrics 链路追踪Traces类型说明工具日志离散的事件记录ELK, Loki指标聚合的数值数据Prometheus, InfluxDB链路追踪请求的完整路径Jaeger, Zipkin二、日志体系建设2.1 日志规范import logging import json from datetime import datetime from enum import Enum class LogLevel(Enum): DEBUG DEBUG INFO INFO WARNING WARNING ERROR ERROR CRITICAL CRITICAL class StructuredLogger: def __init__(self, service_name: str): self.service_name service_name self.logger logging.getLogger(service_name) self.logger.setLevel(logging.INFO) # JSON formatter handler logging.StreamHandler() handler.setFormatter(self._create_formatter()) self.logger.addHandler(handler) def _create_formatter(self): 创建 JSON 格式化器 def formatter(record): log_entry { timestamp: datetime.utcnow().isoformat(), level: record.levelname, service: self.service_name, logger: record.name, message: record.getMessage(), module: record.module, function: record.funcName, line: record.lineno } # 添加异常信息 if record.exc_info: log_entry[exception] self.logger.exception( record.exc_info, exc_inforecord.exc_info ) # 添加额外字段 if hasattr(record, extra): log_entry.update(record.extra) return json.dumps(log_entry) return formatter def log(self, level: LogLevel, message: str, **kwargs): 记录日志 extra {extra: kwargs} if kwargs else {} getattr(self.logger, level.value.lower())(message, extraextra) def info(self, message: str, **kwargs): self.log(LogLevel.INFO, message, **kwargs) def error(self, message: str, **kwargs): self.log(LogLevel.ERROR, message, **kwargs) def warning(self, message: str, **kwargs): self.log(LogLevel.WARNING, message, **kwargs)2.2 AI 应用的日志最佳实践class AILogger: def __init__(self, logger: StructuredLogger): self.logger logger def log_model_request(self, request_id: str, model: str, prompt_length: int): 记录模型请求 self.logger.info( 模型请求开始, request_idrequest_id, modelmodel, prompt_lengthprompt_length, event_typemodel_request_start ) def log_model_response(self, request_id: str, response_length: int, latency_ms: float, tokens_used: int): 记录模型响应 self.logger.info( 模型请求完成, request_idrequest_id, response_lengthresponse_length, latency_mslatency_ms, tokens_usedtokens_used, event_typemodel_response_complete ) def log_model_error(self, request_id: str, error: str, error_type: str): 记录模型错误 self.logger.error( 模型请求失败, request_idrequest_id, errorerror, error_typeerror_type, event_typemodel_error )三、指标体系建设3.1 指标采集from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry class AIMetrics: def __init__(self, registry: CollectorRegistry None): self.registry registry or CollectorRegistry() # 请求计数器 self.request_total Counter( ai_request_total, Total number of AI requests, [model, status], registryself.registry ) # 请求延迟 self.request_duration Histogram( ai_request_duration_seconds, AI request duration in seconds, [model, operation], buckets[0.1, 0.5, 1, 2, 5, 10, 30], registryself.registry ) # Token 使用 self.tokens_used Counter( ai_tokens_used_total, Total tokens used, [model, type], # type: prompt/completion registryself.registry ) # 当前请求数 self.active_requests Gauge( ai_active_requests, Number of active requests, [model], registryself.registry ) def record_request(self, model: str, status: str, duration: float, tokens: int): 记录请求指标 self.request_total.labels(modelmodel, statusstatus).inc() self.request_duration.labels(modelmodel, operationinference).observe(duration) self.tokens_used.labels(modelmodel, typeprompt).inc(tokens) def increment_active(self, model: str): 增加活跃请求数 self.active_requests.labels(modelmodel).inc() def decrement_active(self, model: str): 减少活跃请求数 self.active_requests.labels(modelmodel).dec()3.2 自定义指标class BusinessMetrics: def __init__(self, registry: CollectorRegistry): # 用户指标 self.active_users Gauge( app_active_users, Number of active users, [period], # hourly, daily registryregistry ) # 功能使用指标 self.feature_usage Counter( feature_usage_total, Total feature usage, [feature_name], registryregistry ) # 业务转化指标 self.conversion_rate Gauge( conversion_rate, Business conversion rate, [stage], # trial_to_paid, etc registryregistry )四、链路追踪4.1 分布式追踪基础from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter class TracingSetup: def __init__(self, service_name: str): self.service_name service_name self.setup_tracing() def setup_tracing(self): 配置链路追踪 # 创建 tracer provider provider TracerProvider() # 添加 Jaeger exporter jaeger_exporter JaegerExporter( agent_host_namelocalhost, agent_port6831, ) # 添加 batch span processor provider.add_span_processor( BatchSpanProcessor(jaeger_exporter) ) # 设置全局 tracer provider trace.set_tracer_provider(provider) # 获取 tracer self.tracer trace.get_tracer(self.service_name) def create_span(self, name: str, attributes: dict None): 创建 span return self.tracer.start_as_current_span( name, attributesattributes or {} )4.2 AI 应用的链路追踪class AIDistributedTracing: def __init__(self, tracing: TracingSetup): self.tracing tracing self.tracer tracing.tracer def trace_ai_request(self, user_id: str, prompt: str, model: str): 追踪 AI 请求 with self.tracer.start_as_current_span( ai.request, attributes{ user_id: user_id, model: model, prompt_length: len(prompt) } ) as span: try: # 模拟 AI 调用 response self._call_model(prompt, model) # 记录响应信息 span.set_attribute(response_length, len(response)) span.set_attribute(status, success) return response except Exception as e: span.set_attribute(status, error) span.set_attribute(error.message, str(e)) raise def _call_model(self, prompt: str, model: str) - str: 调用模型实际应用中替换为真实调用 import time time.sleep(0.1) # 模拟调用延迟 return fResponse to: {prompt[:50]}...五、可观测性集成5.1 统一日志上下文from contextvars import ContextVar # 上下文变量 request_id: ContextVar[str] ContextVar(request_id, default) user_id: ContextVar[str] ContextVar(user_id, default) class UnifiedLogger: def __init__(self): self.logger StructuredLogger(app) def _get_context(self) - dict: 获取上下文信息 return { request_id: request_id.get(), user_id: user_id.get() } def info(self, message: str, **kwargs): context self._get_context() context.update(kwargs) self.logger.info(message, **context)5.2 告警配置# alertmanager.yml groups: - name: ai_alerts rules: - alert: HighErrorRate expr: rate(ai_request_total{statuserror}[5m]) 0.1 for: 5m labels: severity: critical annotations: summary: AI 请求错误率过高 description: 当前错误率: ${{ $value }} - alert: HighLatency expr: histogram_quantile(0.95, ai_request_duration_seconds) 10 for: 5m labels: severity: warning annotations: summary: AI 请求延迟过高 description: P95 延迟: ${{ $value }}s - alert: APIKeyNearlyExhausted expr: api_usage_percentage 90 for: 1h labels: severity: warning annotations: summary: API 额度即将耗尽六、实战案例问题排查6.1 问题场景用户反馈AI 客服响应很慢有时还会失败6.2 排查流程# 1. 查看错误率指标 # prometheus query: rate(ai_request_total{statuserror}[5m]) # 2. 查看延迟分布 # prometheus query: histogram_quantile(0.95, ai_request_duration_seconds) # 3. 查看具体错误日志 # loki query: {serviceai-service} | error | json # 4. 查看链路追踪 # jaeger query: serviceai-service operation/api/chat6.3 根因分析通过分析发现错误集中在某个时间段延迟突增与模型 API 响应时间相关链路追踪显示是模型 API 超时七、最佳实践7.1 日志最佳实践✅结构化日志使用 JSON 格式便于查询分析✅统一日志格式所有服务使用相同的格式✅添加上下文包含 request_id、user_id 等✅合理的日志级别不要滥用 ERROR7.2 指标最佳实践✅使用 RED 方法Rate速率、Errors错误、Duration延迟✅USE 方法Utilization利用率、Saturation饱和度、Errors错误✅选择合适的聚合根据业务选择百分位7.3 链路追踪最佳实践✅添加关键属性user_id、model、operation✅合理划分 span不要过细也不要过粗✅采样策略高流量时采样而非全量八、总结可观测性是保障 AI 应用稳定运行的基础。关键在于三大支柱结合日志、指标、链路追踪缺一不可结构化数据便于查询和分析上下文贯穿让数据能够关联持续改进根据问题不断优化记住没有可观测性就没有可靠性。让我们一起打造可观测的 AI 应用