FastAPI后台任务完成,如何设计一个全局的、不掉线的SSE通知中心?
FastAPI全局SSE通知中心设计构建高可靠异步任务通信架构当用户点击生成年度报表按钮时页面瞬间响应任务已开始处理而背后的数据聚合运算可能持续20分钟。如何让用户在这段时间自由浏览其他页面并在任务完成时第一时间收到气泡通知这就是我们需要解决的异步任务状态同步核心命题。1. 为什么SSE是异步通知的最佳选择在实时通信技术选型时我们通常面临WebSocket、长轮询和SSE三种主流方案。让我们通过对比表格看清本质差异特性WebSocket长轮询SSE通信方向全双工半双工单向(服务端推)协议开销中等高低浏览器支持需要API封装兼容性好原生支持自动重连需手动实现每次请求新建内置机制适合场景即时聊天兼容性fallback服务端事件推送SSE的独特优势在异步通知场景尤为突出资源消耗低单个HTTP连接可维持数小时断线恢复通过Last-Event-ID自动续传优先级控制浏览器会自动管理SSE连接带宽# FastAPI中的最小SSE实现示例 app.get(/sse-stream) async def event_stream(): async def event_generator(): while True: yield {data: f心跳 {time.time()}} await asyncio.sleep(10) return EventSourceResponse(event_generator())但原生SSE存在三个致命缺陷仅支持GET请求且无法修改请求头缺乏细粒度的消息路由机制连接状态管理完全依赖客户端2. 全局通知中心架构设计2.1 核心组件拓扑[客户端] ←SSE→ [SSE网关] ←Redis Pub/Sub→ [任务处理器] ↑ ↑ │ │ [Vue3 SPA] [FastAPI]关键设计决策连接隔离每个用户建立独立SSE通道消息路由采用user_id:channel的Redis频道模式状态同步通过PostgreSQL维护任务状态机2.2 消息协议规范{ event: task_completed, data: { task_id: uuidv4, result_url: /download/xyz, timestamp: 1689234567 }, retry: 5000 }字段说明event定义消息处理逻辑的路由标识data实际负载内容建议JSON序列化retry客户端断连时的重试间隔(ms)2.3 连接保活策略# 服务端心跳检测 async def health_check(disconnect: asyncio.Event): while not disconnect.is_set(): await asyncio.sleep(15) yield :\n\n # SSE注释帧用于维持连接客户端需要处理三种异常状态网络抖动指数退避重连1s, 2s, 4s...上限30s服务重启通过Last-Event-ID恢复消息流鉴权过期静默刷新token后重建连接3. Vue3中的工程化实现3.1 useNotificationSSE Composables// hooks/useNotificationSSE.ts export default function(userId: Refstring) { const eventSource shallowRefEventSource() const reconnectAttempts ref(0) const startConnection () { eventSource.value new EventSource(/sse?user_id${userId.value}) eventSource.value.onerror () { setTimeout(startConnection, Math.min(1000 * 2 ** reconnectAttempts.value, 30000)) } } const subscribe (eventType: string, callback: (data: any) void) { watch(eventSource, (es) { es?.addEventListener(eventType, (e) callback(JSON.parse(e.data))) }) } return { subscribe } }3.2 消息过滤中间件# SSE消息路由器 app.middleware(http) async def sse_filter(request: Request, call_next): response await call_next(request) if text/event-stream in response.headers.get(content-type, ): original_send response.body_iterator async def filtered_send(): async for chunk in original_send: if should_deliver(chunk, request.user): # 业务过滤逻辑 yield chunk response.body_iterator filtered_send() return response4. 生产环境优化实践4.1 性能压测数据在4核8G的云主机上不同连接数的资源消耗并发连接数CPU占用内存占用网络吞吐1,00012%1.2GB3MB/s5,00038%2.8GB15MB/s10,00081%4.5GB31MB/s优化建议启用HTTP/2多路复用对静态资源启用CDN分流使用Uvicorn的--limit-concurrency参数4.2 安全防护方案必须实现的防护措施CSRF防护校验Origin头 同源策略速率限制令牌桶算法控制连接频率消息验证对敏感事件进行签名验证# JWT验证装饰器 def sse_jwt_required(endpoint): async def wrapper(request: Request): token request.query_params.get(access_token) try: request.state.user decode_jwt(token) return await endpoint(request) except JWTError: return EventSourceResponse(status_code401) return wrapper在Kubernetes环境中部署时需要特别注意配置Pod反亲和性避免单点故障设置合理的livenessProbe检查通过Service Mesh实现金丝雀发布5. 异常场景处理手册5.1 常见故障模式消息积压当客户端离线超过5分钟应切换为Webhook补发版本冲突在消息协议中加入version字段实现兼容内存泄漏定期回收闲置超过1小时的连接5.2 监控指标设计Prometheus需要监控的关键指标sse_active_connections{envprod} sse_message_rate{event_type*} sse_error_count{typetimeout|auth|protocol}Grafana看板应包含连接存活率趋势图消息延迟百分位图错误类型分布饼图实际项目中我们发现当消息延迟超过95%线200ms时需要立即扩容SSE网关节点。曾经有个电商大促案例由于未设置自动扩缩容导致订单完成通知延迟高达15分钟最终通过预先准备的静态降级页面化解了客诉危机。