难度⭐⭐⭐☆☆ 进阶 阅读约 18 分钟 适用Spring Boot 2.7.x / 3.x Java 17目录[TOC]一、为什么需要流式输出先来感受一个真实的体验差距。非流式调用的时序用户发送请求 → [等待 3~15 秒页面空白] → 一次性收到完整回答**流式调用SSE**的时序用户发送请求 → [0.3秒] 第一个字出现 → 文字持续滚动输出 → 全部输出完毕等待时间一样长但体验天壤之别。ChatGPT、Claude 官网都使用流式输出——打字机效果是现代 AI 产品的标配体验。1.1 技术选型SSE vs WebSocket实现实时推送有两种主流方案先做一个对比维度SSEServer-Sent EventsWebSocket通信方向单向服务端 → 客户端双向协议基于 HTTP/1.1独立的 ws:// 协议实现复杂度⭐ 简单Spring 原生支持⭐⭐⭐ 需要额外配置断线重连浏览器自动重连需手动实现代理/防火墙兼容性好走 HTTP一般需代理支持升级适合场景AI 流式输出✅、消息通知聊天室、实时游戏结论AI 流式输出是典型的单向推送场景SSE 是最合适的选择实现简单、兼容性好无需引入额外依赖。1.2 SSE 协议简介SSE 是 HTML5 标准的一部分服务端响应的 Content-Type 为text/event-stream数据格式如下data: 第一个\n\n data: token\n\n data: [DONE]\n\n每条消息以data:开头两个换行符\n\n结尾。浏览器的EventSourceAPI 可以原生解析这种格式。二、整体方案设计本篇实现分为以下几个部分┌─────────────────────────────────────────────────────────┐ │ 前端浏览器 │ │ EventSource / fetch ReadableStream 接收 SSE 数据流 │ └─────────────────┬───────────────────────────────────────┘ │ GET/POST text/event-stream ┌─────────────────▼───────────────────────────────────────┐ │ ChatStreamController │ │ 返回 SseEmitter 或 FluxServerSentEvent │ └─────────────────┬───────────────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────────────┐ │ ClawStreamService新增接口 │ │ 封装流式调用逻辑回调 token 给 emitter │ └─────────────────┬───────────────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────────────┐ │ OpenClAW ClaudeService.streamChat() │ │ 底层 HTTP 流式请求逐 token 回调 │ └─────────────────┬───────────────────────────────────────┘ │ HTTPS streaming ┌─────────────────▼───────────────────────────────────────┐ │ Anthropic Claude API │ └─────────────────────────────────────────────────────────┘Spring Boot 实现 SSE 有两种方式本文都会介绍方案 ASseEmitterServlet 方式兼容 Spring MVC推荐方案 BFluxServerSentEventReactive 方式需引入 WebFlux三、方案 A基于 SseEmitter推荐SseEmitter是 Spring MVC 的内置类无需额外依赖适合大多数项目。3.1 Service 层流式调用接口在第2篇的基础上新增ClawStreamService接口package com.example.openclaw_demo.service; import com.example.openclaw_demo.dto.ChatRequestDTO; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; /** * AI 流式对话服务接口 */ public interface ClawStreamService { /** * 创建一个 SSE 流式对话 * * param request 对话请求 DTO与第2篇共用 * return SseEmitter 推送句柄Controller 直接返回给客户端 */ SseEmitter streamChat(ChatRequestDTO request); /** * 快捷方法简单流式对话 * * param message 用户消息 * return SseEmitter */ SseEmitter streamSimpleChat(String message); }3.2 Service 实现类package com.example.openclaw_demo.service.impl; import com.example.openclaw_demo.config.ClawProperties; import com.example.openclaw_demo.dto.ChatRequestDTO; import com.example.openclaw_demo.service.ClawStreamService; import io.openclaw.client.ClaudeService; import io.openclaw.model.request.ChatRequest; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; Slf4j Service RequiredArgsConstructor public class ClawStreamServiceImpl implements ClawStreamService { private final ClaudeService claudeService; private final ClawProperties clawProperties; // 用于异步执行流式请求的线程池 // 注意流式请求会阻塞线程直到完成不能复用 Tomcat 的请求线程 private final ExecutorService streamExecutor Executors.newVirtualThreadPerTaskExecutor(); // Java 21 虚拟线程 // Java 17 可改为Executors.newCachedThreadPool() // ---------------------------------------------------------------- // 简单流式对话 // ---------------------------------------------------------------- Override public SseEmitter streamSimpleChat(String message) { ChatRequestDTO req ChatRequestDTO.builder() .message(message) .build(); return streamChat(req); } // ---------------------------------------------------------------- // 核心实现创建 SseEmitter异步执行流式请求 // ---------------------------------------------------------------- Override public SseEmitter streamChat(ChatRequestDTO request) { // 超时时间3 分钟根据业务调整 SseEmitter emitter new SseEmitter(3 * 60 * 1000L); // 异步执行避免阻塞 Tomcat 请求线程 streamExecutor.execute(() - doStream(emitter, request)); return emitter; } // ---------------------------------------------------------------- // 真正执行流式请求的方法在独立线程中运行 // ---------------------------------------------------------------- private void doStream(SseEmitter emitter, ChatRequestDTO dto) { long startMs System.currentTimeMillis(); log.info([ClAW-Stream] 开始流式请求, model{}, dto.getModel() ! null ? dto.getModel() : clawProperties.getDefaultModel()); try { // 组装 OpenClAW 流式请求 ChatRequest clawReq buildStreamRequest(dto); // 调用 OpenClAW 流式 API // onToken每收到一个 token 片段时回调 // onComplete全部输出完毕时回调 // onError发生错误时回调 claudeService.streamChat( clawReq, // ① token 回调发送 SSE 数据 token - { try { emitter.send( SseEmitter.event() .name(token) // 事件名前端用于区分类型 .data(token) // token 文本片段 ); } catch (IOException e) { // 客户端断开连接如用户关闭页面停止推送 log.warn([ClAW-Stream] 客户端断开连接: {}, e.getMessage()); emitter.completeWithError(e); } }, // ② 完成回调发送 [DONE] 信号 统计信息 (stopReason, usage) - { try { long elapsed System.currentTimeMillis() - startMs; // 发送结束事件携带统计数据 emitter.send( SseEmitter.event() .name(done) .data({\stopReason\:\ stopReason \, \inputTokens\: usage.getInputTokens() , \outputTokens\: usage.getOutputTokens() , \elapsedMs\: elapsed }) ); emitter.complete(); // 关闭 SSE 连接 log.info([ClAW-Stream] 完成, stopReason{}, tokens{}/{}, cost{}ms, stopReason, usage.getInputTokens(), usage.getOutputTokens(), elapsed); } catch (IOException e) { emitter.completeWithError(e); } }, // ③ 错误回调 error - { log.error([ClAW-Stream] 流式请求失败: {}, error.getMessage(), error); try { emitter.send( SseEmitter.event() .name(error) .data({\message\:\ error.getMessage() \}) ); } catch (IOException ignored) {} emitter.completeWithError(error); } ); } catch (Exception e) { log.error([ClAW-Stream] 异常, e); emitter.completeWithError(e); } } // ---------------------------------------------------------------- // 私有构建 OpenClAW 流式请求 // ---------------------------------------------------------------- private ChatRequest buildStreamRequest(ChatRequestDTO dto) { String systemPrompt StringUtils.hasText(dto.getSystemPrompt()) ? dto.getSystemPrompt() : clawProperties.getGlobalSystemPrompt(); return ChatRequest.builder() .model(dto.getModel() ! null ? dto.getModel() : clawProperties.getDefaultModel()) .maxTokens(dto.getMaxTokens() ! null ? dto.getMaxTokens() : clawProperties.getDefaultMaxTokens()) .systemPrompt(systemPrompt) .message(dto.getMessage()) .stream(true) // ← 关键开启流式模式 .build(); } }⚠️关键点SseEmitter的send()方法不是线程安全的必须确保同一时刻只有一个线程调用它。OpenClAW 的回调默认在单一线程中顺序执行因此上面的写法是安全的。如果你使用并行流或多线程回调需要加锁。3.3 Controller 层package com.example.openclaw_demo.controller; import com.example.openclaw_demo.dto.ChatRequestDTO; import com.example.openclaw_demo.service.ClawStreamService; import lombok.RequiredArgsConstructor; import org.springframework.http.MediaType; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; RestController RequestMapping(/api/v1/stream) RequiredArgsConstructor public class ChatStreamController { private final ClawStreamService clawStreamService; /** * GET /api/v1/stream/chat?q你好 * produces text/event-stream 是关键告诉 Spring 这是 SSE 接口 */ GetMapping(value /chat, produces MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter streamChat(RequestParam String q) { return clawStreamService.streamSimpleChat(q); } /** * POST /api/v1/stream/chat * 支持自定义 systemPrompt、model 等参数 */ PostMapping(value /chat, produces MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter streamChatPost(RequestBody Validated ChatRequestDTO request) { return clawStreamService.streamChat(request); } }四、方案 B基于 Reactor FluxWebFlux 方式如果你的项目已经引入了 Spring WebFlux可以使用响应式风格代码更简洁。4.1 添加依赖!-- pom.xml -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-webflux/artifactId /dependency⚠️ Spring MVC 和 WebFlux不推荐混用。如果你的项目是纯 Spring MVCstarter-web优先使用方案 A。4.2 WebFlux Controllerpackage com.example.openclaw_demo.controller; import io.openclaw.client.ClaudeService; import io.openclaw.model.request.ChatRequest; import lombok.RequiredArgsConstructor; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; RestController RequestMapping(/api/v1/flux/stream) RequiredArgsConstructor public class ChatFluxStreamController { private final ClaudeService claudeService; /** * GET /api/v1/flux/stream/chat?q你好 * 返回 FluxServerSentEventStringSpring WebFlux 自动处理推送 */ GetMapping(value /chat, produces MediaType.TEXT_EVENT_STREAM_VALUE) public FluxServerSentEventString streamChat(RequestParam String q) { // Sinks.Many 是 Reactor 的多播 sink用于将回调桥接为 Flux Sinks.ManyServerSentEventString sink Sinks.many().unicast().onBackpressureBuffer(); ChatRequest clawReq ChatRequest.builder() .model(claude-sonnet-4-20250514) .maxTokens(1024) .message(q) .stream(true) .build(); // 异步发起流式请求将每个 token 推入 sink claudeService.streamChat( clawReq, token - sink.tryEmitNext( ServerSentEvent.Stringbuilder() .event(token) .data(token) .build() ), (stopReason, usage) - { sink.tryEmitNext( ServerSentEvent.Stringbuilder() .event(done) .data([DONE]) .build() ); sink.tryEmitComplete(); }, error - sink.tryEmitError(error) ); return sink.asFlux(); } }五、前端对接5.1 方案 A使用原生 EventSourceGET 请求EventSource是浏览器内置 API专门用于接收 SSE只支持 GET 请求!DOCTYPE html html langzh-CN head meta charsetUTF-8 titleAI 流式对话/title style body { font-family: sans-serif; max-width: 800px; margin: 40px auto; padding: 0 20px; } #output { min-height: 200px; padding: 16px; background: #f8f9fa; border-radius: 8px; border: 1px solid #dee2e6; white-space: pre-wrap; word-wrap: break-word; font-size: 15px; line-height: 1.7; } #status { color: #888; font-size: 13px; margin-top: 8px; } button { padding: 8px 20px; background: #e74c3c; color: #fff; border: none; border-radius: 6px; cursor: pointer; font-size: 14px; } button:disabled { background: #ccc; cursor: not-allowed; } /style /head body h2 AI 流式对话 Demo/h2 input idinput typetext placeholder输入你的问题... stylewidth:70%;padding:8px; / button idbtn onclickstartStream()发送/button div idoutput/div div idstatus/div script let eventSource null; function startStream() { const q document.getElementById(input).value.trim(); if (!q) return; // 重置 UI document.getElementById(output).textContent ; document.getElementById(status).textContent 连接中...; document.getElementById(btn).disabled true; // 关闭旧连接 if (eventSource) eventSource.close(); // 创建 SSE 连接仅支持 GET eventSource new EventSource(/api/v1/stream/chat?q${encodeURIComponent(q)}); // 监听 token 事件与服务端 event name 对应 eventSource.addEventListener(token, (e) { document.getElementById(output).textContent e.data; }); // 监听完成事件 eventSource.addEventListener(done, (e) { const info JSON.parse(e.data); document.getElementById(status).textContent ✅ 完成 | 输入 ${info.inputTokens} tokens | 输出 ${info.outputTokens} tokens | 耗时 ${info.elapsedMs}ms; document.getElementById(btn).disabled false; eventSource.close(); }); // 监听错误事件 eventSource.addEventListener(error, (e) { console.error(SSE error:, e); document.getElementById(status).textContent ❌ 请求失败请重试; document.getElementById(btn).disabled false; eventSource.close(); }); } /script /body /html5.2 方案 B使用 fetch ReadableStream支持 POSTEventSource只支持 GET如果需要用 POST 发送复杂请求体改用fetchReadableStreamasync function streamWithPost(message, systemPrompt ) { const output document.getElementById(output); output.textContent ; const response await fetch(/api/v1/stream/chat, { method: POST, headers: { Content-Type: application/json, Accept: text/event-stream, // 告诉服务端我们期望 SSE }, body: JSON.stringify({ message, systemPrompt, maxTokens: 1024, }), }); if (!response.ok) { console.error(请求失败:, response.status); return; } // 获取可读流 reader const reader response.body.getReader(); const decoder new TextDecoder(utf-8); let buffer ; while (true) { const { done, value } await reader.read(); if (done) break; // 解码字节流 buffer decoder.decode(value, { stream: true }); // SSE 格式每条消息以 \n\n 结尾 const lines buffer.split(\n\n); buffer lines.pop(); // 最后一个可能不完整保留到下次 for (const chunk of lines) { // 解析 event: xxx\ndata: yyy 格式 const eventMatch chunk.match(/^event: (.)$/m); const dataMatch chunk.match(/^data: (.)$/m); const eventName eventMatch ? eventMatch[1] : message; const data dataMatch ? dataMatch[1] : ; if (eventName token) { output.textContent data; } else if (eventName done) { const info JSON.parse(data); console.log(完成:, info); } else if (eventName error) { console.error(错误:, data); } } } }Vue / React 项目建议在框架项目中可以将上述 fetch 逻辑封装为useStream()Hook 或 Vue Composable统一管理连接状态和错误处理。5.3 封装 Vue3 Composable示例// composables/useAiStream.js import { ref } from vue; export function useAiStream() { const content ref(); const loading ref(false); const usage ref(null); const error ref(null); async function sendStream(message, systemPrompt ) { content.value ; usage.value null; error.value null; loading.value true; try { const response await fetch(/api/v1/stream/chat, { method: POST, headers: { Content-Type: application/json }, body: JSON.stringify({ message, systemPrompt }), }); const reader response.body.getReader(); const decoder new TextDecoder(); let buffer ; while (true) { const { done, value } await reader.read(); if (done) break; buffer decoder.decode(value, { stream: true }); const parts buffer.split(\n\n); buffer parts.pop(); for (const part of parts) { const eventMatch part.match(/^event: (.)$/m); const dataMatch part.match(/^data: (.)$/m); const event eventMatch?.[1]; const data dataMatch?.[1]; if (event token) content.value data; else if (event done) usage.value JSON.parse(data); else if (event error) error.value JSON.parse(data); } } } catch (e) { error.value { message: e.message }; } finally { loading.value false; } } return { content, loading, usage, error, sendStream }; }!-- 在组件中使用 -- template div textarea v-modelquestion placeholder输入问题... / button clicksendStream(question) :disabledloading {{ loading ? 生成中... : 发送 }} /button div classoutput{{ content }}/div div v-ifusage classstats 输入 {{ usage.inputTokens }} tokens | 输出 {{ usage.outputTokens }} tokens /div /div /template script setup import { ref } from vue; import { useAiStream } from /composables/useAiStream; const question ref(); const { content, loading, usage, sendStream } useAiStream(); /script六、重要的工程细节6.1 超时配置SSE 连接是长连接需要在多个层面配置超时server: # Tomcat 连接超时默认 60 秒流式接口必须调大 connection-timeout: 300000 # 5 分钟毫秒 tomcat: connection-timeout: 300000 # OpenClAW 底层请求超时 openclaw: timeout: 180 # 秒需大于模型最长响应时间如果使用 Nginx 反向代理还需要配置# nginx.conf location /api/v1/stream/ { proxy_pass http://backend; proxy_http_version 1.1; # SSE 关键配置禁用缓冲数据立即透传给客户端 proxy_buffering off; proxy_cache off; # 超时配置需大于最长响应时间 proxy_read_timeout 300s; proxy_send_timeout 300s; # SSE 必须的响应头 proxy_set_header Connection ; chunked_transfer_encoding on; }6.2 客户端断开处理当用户关闭页面或网络中断时服务端需要及时感知并释放资源Override public SseEmitter streamChat(ChatRequestDTO request) { SseEmitter emitter new SseEmitter(3 * 60 * 1000L); // 注册连接超时回调 emitter.onTimeout(() - { log.warn([ClAW-Stream] SSE 连接超时); emitter.complete(); }); // 注册客户端断开连接回调用户关闭页面等 emitter.onCompletion(() - log.info([ClAW-Stream] SSE 连接已关闭) ); // 注册错误回调 emitter.onError(ex - log.error([ClAW-Stream] SSE 错误: {}, ex.getMessage()) ); // ✅ 使用原子标志位感知断开状态 java.util.concurrent.atomic.AtomicBoolean cancelled new java.util.concurrent.atomic.AtomicBoolean(false); emitter.onCompletion(() - cancelled.set(true)); emitter.onTimeout(() - cancelled.set(true)); emitter.onError(ex - cancelled.set(true)); streamExecutor.execute(() - { // 在 token 回调中检查是否已断开 claudeService.streamChat( buildStreamRequest(request), token - { if (cancelled.get()) { // 客户端已断开抛出异常通知 OpenClAW 停止推送 throw new RuntimeException(客户端已断开连接); } try { emitter.send(SseEmitter.event().name(token).data(token)); } catch (IOException e) { cancelled.set(true); emitter.completeWithError(e); } }, // ... 其他回调 ); }); return emitter; }6.3 并发控制防止资源耗尽每个 SSE 连接都会占用一个线程或虚拟线程高并发下需要限制最大并发数Configuration public class StreamConfig { Bean(name streamExecutor) public ExecutorService streamExecutor() { // Java 21使用虚拟线程轻量高效无需手动限制并发 return Executors.newVirtualThreadPerTaskExecutor(); // Java 17使用有界线程池防止无限创建线程 // return new ThreadPoolExecutor( // 10, // 核心线程数 // 50, // 最大线程数 // 60L, TimeUnit.SECONDS, // 空闲线程存活时间 // new LinkedBlockingQueue(100), // 队列容量 // new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略调用者线程执行 // ); } }6.4 流式接口的异常响应流式接口一旦建立 SSE 连接并写入了第一个data:HTTP 状态码就固定为 200 了后续无法再修改。因此错误需要通过事件协议传递// ✅ 正确通过 error 事件告知客户端 emitter.send( SseEmitter.event() .name(error) .data({\code\:5001,\message\:\AI 服务暂时不可用\}) ); emitter.complete(); // ❌ 错误连接建立后不能再设置 HTTP 状态码 // response.setStatus(500); // 无效前端收到error事件后应该展示错误提示并停止监听eventSource.addEventListener(error, (e) { const errInfo JSON.parse(e.data); showErrorToast(errInfo.message); eventSource.close(); });七、流式接口测试7.1 curl 测试# curl 测试 SSE-N 禁用缓冲实时看到输出 curl -N http://localhost:8080/api/v1/stream/chat?q用100字介绍Spring Boot # POST 方式 curl -N -X POST http://localhost:8080/api/v1/stream/chat \ -H Content-Type: application/json \ -H Accept: text/event-stream \ -d {message:解释什么是 AOP,systemPrompt:你是 Java 专家回答简洁}正常输出示例event: token data: Spring event: token data: Boot event: token data: 是一个 event: token data: 开源框架 ... 持续输出 event: done data: {stopReason:end_turn,inputTokens:28,outputTokens:156,elapsedMs:3421}7.2 Postman 测试Postman 从 9.x 版本开始支持 SSE新建请求URL 填入http://localhost:8080/api/v1/stream/chat?q你好点击Send旁的下拉箭头选择Send and Download或者在 Headers 中添加Accept: text/event-stream即可看到实时推送的 token八、完整项目结构openclaw-demo/ ├── src/main/java/com/example/openclaw_demo/ │ ├── config/ │ │ ├── ClawProperties.java │ │ └── StreamConfig.java ★ 新增线程池配置 │ │ │ ├── service/ │ │ ├── ClawChatService.java 第2篇 │ │ ├── ClawStreamService.java ★ 新增流式接口 │ │ └── impl/ │ │ ├── ClawChatServiceImpl.java第2篇 │ │ └── ClawStreamServiceImpl.java ★ 新增流式实现 │ │ │ ├── controller/ │ │ ├── ChatController.java 第2篇 │ │ └── ChatStreamController.java ★ 新增SSE Controller │ │ │ └── ...dto/exception/common 同第2篇 │ ├── src/main/resources/ │ └── application.yml timeout 配置更新 │ └── src/main/resources/static/ └── stream-demo.html ★ 新增前端测试页面九、常见问题排查问题现象可能原因解决方法输出一段后突然中断Nginx proxy_read_timeout 太小设置proxy_read_timeout 300s前端收不到数据但 curl 正常Nginx 缓冲了响应添加proxy_buffering offEventSource一直重连服务端未正确关闭连接确保emitter.complete()被调用中文乱码编码问题new TextDecoder(utf-8)第一个 token 延迟很高模型冷启动正常现象首 token 延迟约 200~800msSseEmitter 并发量高时 OOM线程池无界改用有界线程池或 Java 21 虚拟线程CORS 报错跨域配置缺失添加CrossOrigin或全局 CORS 配置跨域配置如有需要Configuration public class CorsConfig implements WebMvcConfigurer { Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping(/api/v1/stream/**) .allowedOrigins(http://localhost:3000, https://yourdomain.com) .allowedMethods(GET, POST) .allowedHeaders(*) // SSE 必须允许以下响应头否则浏览器读取不到 .exposedHeaders(Content-Type, Cache-Control, X-Accel-Buffering, Transfer-Encoding) .maxAge(3600); } }十、本篇总结本篇完整实现了 AI 流式输出功能核心知识点回顾为什么用 SSE单向推送、基于 HTTP、浏览器原生支持、实现简单是 AI 流式输出的最佳选择SseEmitter 核心模式Controller 创建 Emitter → 异步线程执行流式请求 → 回调中调用emitter.send()→ 完成后emitter.complete()三个回调onToken逐 token 推送、onComplete发送结束事件、onError发送错误事件前端对接GET 用EventSourcePOST 用fetch ReadableStream工程细节超时配置Tomcat Nginx OpenClAW 三层、客户端断开感知、并发控制、错误通过事件协议传递测试方法curl -N实时查看 SSE 输出下一篇将在流式输出的基础上实现多轮对话与会话管理用 Redis 持久化对话历史支持多用户隔离构建真正有记忆的 AI 接口。如果本文对你有帮助欢迎点赞 收藏 ⭐ 关注你的支持是我持续创作的动力标签Spring BootSSE流式输出OpenClAWClaude APIServer-Sent EventsJava AI打字机效果