FastAPI中间件实战5个真实业务场景下的自定义拦截器开发指南1. 电商订单全链路追踪中间件电商平台的核心痛点之一是订单状态追踪的透明性。我们设计一个中间件自动为每个请求注入唯一追踪ID并记录关键节点耗时。import uuid import time from fastapi import Request from contextvars import ContextVar request_id_var ContextVar(request_id, default) app.middleware(http) async def order_tracing_middleware(request: Request, call_next): 电商订单全链路追踪中间件 # 生成唯一请求ID可替换为实际订单号 request_id str(uuid.uuid4()) request_id_var.set(request_id) # 记录请求开始时间 start_time time.time() request.state.start_time start_time # 注入请求头 request.headers.__dict__[_list].append( (bx-request-id, request_id.encode()) ) try: response await call_next(request) finally: # 计算处理耗时 process_time time.time() - start_time response.headers[X-Process-Time] f{process_time:.4f}s # 记录到日志系统示例使用print实际应接入ELK等 print( f[OrderTrace] ID{request_id} | fPath{request.url.path} | fStatus{response.status_code} | fCost{process_time:.2f}ms ) return response关键功能实现使用ContextVar实现请求级别的变量存储自动注入X-Request-ID到请求和响应头精确计算API处理耗时可识别性能瓶颈生产环境建议将此中间件注册顺序尽量靠前确保能捕获所有后续中间件的处理时间2. 物联网设备心跳检测中间件针对IoT设备定期上报数据的场景我们需要自动检测异常设备并触发告警。from datetime import datetime from collections import defaultdict import pytz device_last_seen defaultdict(datetime.now) app.middleware(http) async def device_heartbeat_middleware(request: Request, call_next): 物联网设备心跳检测中间件 # 仅处理设备上报接口 if request.url.path ! /api/device/report: return await call_next(request) # 从请求头获取设备ID实际项目应从认证信息获取 device_id request.headers.get(X-Device-ID) if not device_id: return JSONResponse( status_code400, content{error: Missing device identification} ) # 更新最后活跃时间 device_last_seen[device_id] datetime.now(pytz.UTC) # 检查是否超过阈值示例为30分钟 inactive_threshold 30 * 60 last_seen device_last_seen.get(device_id) if (datetime.now(pytz.UTC) - last_seen).seconds inactive_threshold: # 触发告警逻辑示例仅打印日志 print(f[Device Alert] {device_id} inactive for 30 minutes) return await call_next(request)设备状态检测策略检测类型阈值设置响应动作心跳超时30分钟触发邮件/SMS告警上报频率异常±20%波动记录异常日志数据格式异常连续3次错误临时冻结设备API访问3. 智能限流与防刷中间件结合业务特征实现动态限流防止API被恶意刷取。from fastapi import HTTPException import redis.asyncio as redis # 连接Redis集群生产环境应使用连接池 redis_conn redis.Redis(hostredis-cluster, decode_responsesTrue) app.middleware(http) async def smart_rate_limiter(request: Request, call_next): 智能业务限流中间件 # 获取客户端指纹综合IP、设备ID、用户Token等 client_fingerprint ( f{request.client.host}: f{request.headers.get(X-Device-ID, unknown)}: f{request.headers.get(Authorization, anonymous)[:6]} ) # 构造Redis键 redis_key frate_limit:{client_fingerprint}:{request.url.path} # 获取当前计数 current_count await redis_conn.incr(redis_key) if current_count 1: await redis_conn.expire(redis_key, 60) # 设置60秒过期 # 动态限流策略 base_limit 100 # 基础限流阈值 if /api/payment in request.url.path: base_limit 10 # 支付接口更严格 if current_count base_limit: raise HTTPException( status_code429, detailfRate limit exceeded: {base_limit}/minute, headers{Retry-After: 60} ) return await call_next(request)多维度限流规则基础防护层IP级别100请求/分钟账号级别30关键操作/小时业务敏感接口支付接口10次/分钟短信发送1次/30秒突发流量处理滑动时间窗口算法令牌桶缓冲机制4. 多租户数据隔离中间件SaaS系统中确保各租户数据严格隔离的解决方案。from typing import Optional def get_tenant_db_connection(tenant_id: str): 获取租户专属数据库连接模拟函数 return fConnectionPool-for-{tenant_id} app.middleware(http) async def tenant_isolation_middleware(request: Request, call_next): 多租户数据隔离中间件 # 从子域名或JWT获取租户ID tenant_id: Optional[str] None # 方案1从子域名提取如acme.saas.com host_parts request.url.hostname.split(.) if len(host_parts) 2: tenant_id host_parts[0] # 方案2从JWT获取生产环境推荐 if not tenant_id and Authorization in request.headers: try: token request.headers[Authorization].split( )[1] # 此处应验证JWT并提取tenant_id示例简化 tenant_id demo_tenant except: pass # 未识别租户的请求拒绝处理 if not tenant_id: return JSONResponse( status_code400, content{error: Tenant identification missing} ) # 将租户连接注入请求上下文 request.state.tenant_db get_tenant_db_connection(tenant_id) return await call_next(request)数据隔离方案对比方案类型实现复杂度隔离级别适用场景数据库隔离高物理隔离金融级SaaSSchema隔离中逻辑隔离中大型SaaS字段标记低应用隔离小型SaaS5. 智能缓存响应中间件根据业务规则自动缓存高频访问的API响应。from fastapi.encoders import jsonable_encoder import hashlib app.middleware(http) async def cache_middleware(request: Request, call_next): 智能缓存响应中间件 # 跳过非GET请求 if request.method ! GET: return await call_next(request) # 生成请求指纹URL查询参数认证头 cache_key hashlib.md5( f{request.url.path}?{request.url.query} f{request.headers.get(Authorization,)}.encode() ).hexdigest() # 检查Redis缓存 cached_response await redis_conn.get(fcache:{cache_key}) if cached_response: print(fCache hit for {request.url.path}) return JSONResponse( status_code200, contentjson.loads(cached_response), headers{X-Cache: HIT} ) # 无缓存时继续处理 response await call_next(request) # 只缓存成功响应 if response.status_code 200: # 根据接口特性设置不同TTL cache_ttl 300 # 默认5分钟 if /api/products in request.url.path: cache_ttl 3600 # 商品信息缓存1小时 # 异步写入缓存不阻塞主流程 await redis_conn.setex( fcache:{cache_key}, cache_ttl, json.dumps(jsonable_encoder(response.body)) ) return response缓存策略配置建议静态资源配置app.get(/static/{path:path}) cache_control(max_age86400) # 强制缓存1天 async def static_resources(): ...动态接口配置app.get(/api/trending) cache_control( vary[Authorization], max_age300, stale_while_revalidate60 ) async def get_trending(): ...敏感数据排除app.get(/api/personal-data) no_cache # 完全禁用缓存 async def get_personal_data(): ...