1. stream() 同步传输fromlangchain.chat_modelsimportinit_chat_model# 流式传输modelinit_chat_model(modeldeepseek-chat,model_providerdeepseek)# stream方法返回的是一个迭代器产生的是消息块print(model.invoke(写一个关于春天的作文1000字))stream方法返回的是一个迭代器产生的是消息块那么就可以遍历拿到所有的消息块。fromlangchain.chat_modelsimportinit_chat_model# 流式传输modelinit_chat_model(modeldeepseek-chat,model_providerdeepseek)# stream方法返回的是一个迭代器产生的是消息块# 那么就可以遍历拿到所有的消息块forchunkinmodel.stream(写一个关于春天的作文1000字):print(chunk.content,end|,flushTrue)# end|每个块结尾用|进行分割flushTrue强制立即刷新输出缓冲区输出结果|##| |春|分||春|分|日||正|午|时分||我|站在|学校|操场|中央|。|地理|老师说||这时|太阳|直|射|赤|道||南北|半球|昼夜|平分|。|可|我怎么|也|感受|不到|这种|平衡|——|脚下的|影子|几乎|要|缩|成一个|点||仿佛|随时|会|消失|。|阳光|很|烈||晒|得|人|头皮|发|麻||空气|里|飘|着|柳|絮||像|极了|冬天|残留|的|雪花|。|我|抬头|望向|那|棵|老|槐|树||树|梢|上|刚|冒|出的|嫩|芽|在|强|光|下|显得|格外|刺|眼|。|这|让我|想起|爷爷|家的|春天|。|爷爷|住在|太行|山|深处||那里|没有|高大的|教学楼||只有|连绵|起伏|的山|峦|。|三|月的|山|风|依然|凛|冽||带着|冬天|最后的|倔|强|。|阳光|穿过|稀疏|的|树枝||在地上|留下|斑|驳|的影子|。|去年|这时候||我|蹲|在|爷爷|的|菜|园|里||看他|种|土豆|。|他|弯|着|腰||用|锄|头|在地上|挖|出一条|浅浅|的|沟||然后把|发芽|的|土豆|块|放|进去||再|用手|轻轻|盖上|土|。|“|这样|就好了|”|我问|。|“|好了|。”|爷爷|直|起|腰||拍拍|手上的|泥土|“|等着|吧||再过|些|日子||它们|就|发芽|了|。”|我|蹲|下来||盯着|那片|平整|的土地||却|看不见|任何|变化|。|阳光|照|在|褐|色的|泥土|上||反射|出|淡淡|的光|。|远处的|山|还是|枯|黄的||只有|近|处的|几|棵|柳|树|冒|出了|鹅|黄的|芽|。|爷爷|说的|“|些|日子|”|到底|要|多久|呢||现在||当我|站在|城市的|校园|里||望着|被|高楼|切割|成|方|块的|天空||突然|理解了|爷爷|的话|。|他|说的|“|些|日子|”|不是|日历|上|翻|过去|的一|页||而是|土地|里|种子|一点点|积蓄|力量|的过程|。|就像|学校的|铃声||定点|响起||准时|下课||但|春天的|生长|从来不|按|铃声|来|。|前|些|天||科学|课|上学|了|植物的|光合|作用||老师说|植物|通过|光合|作用|把|阳光|变成|能量||于是就|开|出了|花||结|出了|果|。|可|我觉得||事情|哪有|这么|简单|。|去年|春天|在|爷爷|家||我看到|一颗|蒲公英|种子|落在|石|缝|里||没有|阳光||没有|土壤||只有|一点|雨水|和|灰尘|。|但它|还是|发芽|了||还|开|出了一|朵|小小的|黄花|。|那一刻||我才|明白||春天|不是|偶然|降临|的|礼物|。|这|大概|就是|春天的|秘密|吧|。|它|不需要|大声|宣告|自己的|到来||也不需要|人们|时刻|惦记|。|就像|爷爷|种|下的|土豆||在|看不见|的地方|悄悄|生长|。|而我们|呢||我们|总|在|等待|一个|隆|重的|开始||一个|完美的|春天||却|忘了||春天|就在|每一次|呼吸|之间||在|每一个|隐秘|的|角落里|静静|等待|。|||我们调试可以看到整个chunk是一个AIMessageChunk也就是AIMessage其中的一个块。2. astream() 异步传输2.1 同步(阻塞)方式# 同步IOimporttimedefboil_water():print(开始烧水...)time.sleep(5)# 模拟烧水5s, CPU 完全空闲print(烧水完成...)defsend_message():print(开始发消息...)time.sleep(2)# 模拟烧水2sprint(发消息完成...)defmain():# 1、烧水boil_water()# 2、发消息send_message()# 共耗时7smain()问题 在boil_water 函数等待的5秒里CPU 完全空闲但却不能去做send_message 任务效率低下。2.2 协程多进程通常利用的是多核 CPU 的优势同时执行多个计算任务。每个进程有自己独立的内存管理所以不同进程之间要进行数据通信比较麻烦。多线程是在一个 cpu 上创建多个子任务当某一个子任务休息的时候其他任务接着执行。多线程的控制是由 python 自己控制的。线程存在数据同步问题所以要有锁机制。协程的实现是在一个线程内实现的相当于流水线作业。由于线程切换的消耗比较大所以对于并发编程可以优先使用协程。协程作为一种轻量级的并发编程模型可以被视为用户态的“轻量级线程”。与传统线程相比协程的核心优势在于其调度完全由用户空间掌控避免了操作系统内核的频繁介入从而显著降低了上下文切换的开销。2.3 异步方式# 异步 IOimportasyncio# 协程1asyncdefboil_water_async():print(开始烧水...)awaitasyncio.sleep(5)# 关键 await表示等待这个操作完成但期间可以做别的事print(烧水完成...)# 协程2asyncdefsend_message_async():print(开始发消息...)awaitasyncio.sleep(2)# 模拟烧水2sprint(发消息完成...)我们将这两个方法定义成了异步的实际上boil_water_async和send_message_async就是两个协程。我们让main函数来管理这两个协程的流程# 异步 IOimportasyncio# 协程1asyncdefboil_water_async():print(开始烧水...)awaitasyncio.sleep(5)# 关键 await表示等待这个操作完成但期间可以做别的事print(烧水完成...)# 协程2asyncdefsend_message_async():print(开始发消息...)awaitasyncio.sleep(2)# 模拟烧水2sprint(发消息完成...)# 协程调度# 事件循环asyncdefmain():# 1、烧水任务,使用asyncio.create_task将其创建成任务task1asyncio.create_task(boil_water_async())# 2、发消息任务task2asyncio.create_task(send_message_async())# 等待任务1和任务2都完成awaittask1awaittask2# 5s# run函数 会创建一个事件循环并运行指定的协程。asyncio.run(main())输出结果开始烧水...开始发消息...发消息完成...烧水完成...什么是事件循环事件循环是 asyncioPython 标准库中的模块用于编写异步 I/O操作的代码的核心你可以把它想象成一个总调度员或一个高效的待办事项 (To-Do List) 管理员。 它的工作流程非常简单它维护着一个任务列表比如煮水、发短信。它不断地循环检查每个任务 a. 如果任务处于“等待I/O” 状态比如等水开、等网络响应就暂停它立即去执行下一个已经“就绪” 的任务。 b. 如果任务的等待时间到了或者 I/O 操作完成了事件循环就恢复执行这个任务。通过使用 asyncio 我们可以在单线程中同时处理多个任务。一个在单线程内调度和管理所有协程的核心机制就是事件循环。它不停地检查哪些协程可以执行哪些在等待。总结一下协程是 asyncio 的核心概念之一。它是一个特殊的函数可以在执行过程中暂停并在稍后恢复执行。协程通过 async def 关键字定义并通过 await 关键字暂停执行等待异步操作完成。要运行一个协程可以使用 asyncio.run() 函数。它会创建一个事件循环并运行指定的协程。事件循环是 asyncio 的核心组件负责调度和执行协程。它不断地检查是否有任务需要执行并在任务完成后调用相应的回调函数。2.4 异步流式输出fromlangchain.chat_modelsimportinit_chat_modelimportasyncio modelinit_chat_model(modeldeepseek-chat,model_providerdeepseek)# 异步流式输出asyncdefasync_stream():print(异步调用)asyncforchunkinmodel.astream(写一段关于春天的作文100字):print(chunk.content,end|,flushTrue)# 创建事件循环执行协程asyncio.run(async_stream())输出结果异步调用|#| |春||窗外||梧桐|抽|出了|嫩|芽||茸|茸|的||像|刚|醒|来的|眼|。|阳光|软|软|地|洒|下来||暖|融融|的||把|冬|日的|寒气|都|融|化了|。|风|里|带着|些|泥土|的|味儿||还有|花的|香|。|远|山|青|了||水|也|绿|了|。|燕子|衔|着|春|泥||在|屋檐|下|筑|巢||叽|叽|喳|喳|的||好|不|热闹|。|春天|就这样|悄悄地|来了||把|一切都|唤|醒了|。|||3. 自定义输出格式fromtypingimportIterator,Listfromlangchain.chat_modelsimportinit_chat_modelfromlangchain_core.output_parsersimportStrOutputParser# 组件一聊天模型modelinit_chat_model(modeldeepseek-chat,model_providerdeepseek)# 组件二定义输出解析器parserStrOutputParser()# 自定义生成器defsplit_into_list(input:Iterator[str])-Iterator[List[str]]:bufferforchunkininput:bufferchunk# 遇到句号刷新while。inbuffer:# 找到句号的位置stop_indexbuffer.index(。)yield[buffer[:stop_index].strip()]bufferbuffer[stop_index1:]# 处理buffer最后几个字yield[buffer[:].strip()]# 定义链chainmodel|parser|split_into_listforchunkinchain.stream(写一个关于春天的作文5句话每句话用中文句号隔开):# print(chunk.content, end|, flushTrue)# 这里使用的是输出解析器结果就是一个str不用再chunk.contentprint(chunk,end|,flushTrue)yield 是 Python 中用来创建生成器的关键字。包含 yield的函数不会一次性执行完并返回一个值而是返回一个迭代器生成器对象。每次调用 next()或循环迭代时函数会从上一次暂停的地方继续执行直到遇到下一个 yield然后把 yield 后面的值“吐”出来再次暂停。输出结果[春天总是悄然而至像一位羞涩的画家用嫩绿的草芽和粉白的杏花悄悄地涂抹着大地]|[清晨的露珠挂在刚刚舒展的柳叶上折射出细碎的光芒仿佛每一滴都藏着一个苏醒的梦]|[风也变得温柔了不再像冬天那样尖锐而是一阵一阵地送来泥土和青草的香气]|[人们脱去厚重的冬衣脚步不自觉地轻快起来连说话的声音里都带着笑意]|[春天不像夏天那样热烈也不像秋天那样浓艳它只是轻轻地、坚定地告诉我们一切都可以重新开始]|[]|4. 流式传输原理在流式传输中客户端向服务端发送消息之后服务端需要向客户端持续的推送消息WebSocket可以支持双向连接但是我们需要维护状态。4.1 SSE协议HTTP 协议本身设计为无状态的请求-响应模式严格来说是无法做到服务器主动推送消息到客户端但通过Server-Sent Events 服务器发送事件简称 SSE技术可实现流式传输允许服务器主动向浏览器推送数据流。也就是说服务器向客户端声明接下来要发送的是流消息(streaming)这时客户端不会关闭连接会一直等待服务器发送过来新的数据流。SSEServer-Sent Events是一种基于 HTTP 的轻量级实时通信协议浏览器可以通过内置的EventSource API 接收并处理这些实时事件。SSE核心特点基于 HTTP 协议复用标准 HTTP/HTTPS 协议无需额外端口或协议兼容性好且易于部署。单向通信机制SSE 仅支持服务器向客户端的单向数据推送客户端通过普通 HTTP 请求建立连接后服务器可持续发送数据流但客户端无法通过同一连接向服务器发送数据。自动重连机制支持断线重连连接中断时浏览器会自动尝试重新连接支持 retry 字段指定重连间隔。自定义消息类型客户端发起请求后服务器保持连接开放响应头设置Content-Type: text/eventstream标识为事件流格式持续推送事件流。数据格式服务端向浏览器发送 SSE 数据需要设置必要的 HTTP 头信息# 告诉浏览器返回的数据类型是“事件流”浏览器会据此保持连接并等待后续数据。Content-Type:text/event-stream;charsetutf-8# 要求 HTTP 连接保持打开状态而不是每次发送完数据就关闭。这样服务器可以持续推送消息。Connection:keep-aliveSSE 的消息由若干行组成每条消息之间用 两个换行符 \n\n 分隔。每一行的基本格式是[field]:value\n4.2 LangChain实现方式上面的代码中我们的model/chain调用的stream方法都是LangChain里面的组件的流式传输能力。LangChain 本身并不“创造”或“规定”一个底层的网络传输协议而是依赖于其底层的大模型供应商如 OpenAI和我们自身服务应用所使用的 Web 框架如 FastAPI的协议。因此对于 LangChain 的流式传输能力本身是因为大模型供应商提供了流式传输能力由 LangChain进行调用后接收并处理成一个个的AIMessageChunk 。