【LangChain】流式传输 个人主页秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343 系列专栏https://blog.csdn.net/qinjh_/category_13137010.html​​​目录流式传输stream() 同步传输astream() 异步传输异步相关概念使用使用 StrOutputParser 解析模型的输出自定义流式输出解析器深度探索流式传输SSE 协议介绍LangChain 流式传输流程分析通过源码分析流程LangChain 请求 OpenAI 使用什么协议LangChain 如何支持流式传输OpenAI 返回的块是什么格式如何转换成 AIMessageChunk 使用 LangSmith 跟踪 LLM 应用前言 hello! 各位铁子们大家好哇。今日更新了LangChain相关内容 欢迎大家关注点赞收藏⭐️留言流式传输流式处理对于使基于 LLM 的应用程序能够响应最终用户至关重要。其通过逐步显示输出甚至在完整 的响应准备就绪之前流式传输可以显着改善用户体验。我们之前直接使用 invoke 的调用方式属于非流式传输看到的现象是聊天模型直接返回全量内容若 模型思考时间较长则我们等待的时间就越长。stream() 同步传输在 LangChain 聊天模型中可以使用其 .stream() 方法来同步生成流式响应的效果。 聊天模型的 .stream() 方法返回一个迭代器该迭代器在生成输出时同步产生输出 消息块 。可以 使用 for 循环实时处理每个块。代码如下model ChatOpenAI(modelgpt-4o-mini) # 返回一个迭代器产生的消息块 chunks [] for chunk in model.stream(写一段关于春天的作文1000字): chunks.append(chunk) # chunk: AIMessageChunk print(chunk.content, end|, flushTrue)通过调试让我们来看下 chunk 是什么见下图我们得到了一个叫做 AIMessageChunk 的东西它代表 AIMessage 的一部分也就是消息块。 消息块还可以直接相加来看效果print(chunks[0] chunks[1] chunks[2] chunks[3] chunks[4])结果如下 AIMessageChunk content有一天兔 additional_kwargs{} response_metadata{} idrun-- e619cbc3-9ee9-4ae7-a73e-32edb166d401astream() 异步传输对于流式传输通常我们可以选择异步调用。先来了解下异步相关知识。异步相关概念想象一个场景你需要煮一壶水同时还要给朋友发一条短信。我们分别用同步传统和异步两种 方式来完成以此对比并引入 协程 和 事件循环 的概念。同步阻塞方式这就像是一个“死心眼”的人做事必须一件一件来import time def boil_water(): print(开始煮水...) time.sleep(5) # 模拟阻塞等待5秒 print(水开了) def send_message(): print(开始发短信...) time.sleep(2) # 模拟阻塞等待2秒 print(短信发送成功) # 主程序 def main(): boil_water() # 先花5秒煮水期间什么也不能做 send_message() # 水开后再花2秒发短信 main()总耗时7秒问题 在 boil_water 函数等待的5秒里CPU 完全空闲但却不能去做 send_message 任务 效率低下。异步方式我们请出 asyncio 、 协程 和 事件循环 。什么是协程多进程通常利用的是多核 CPU 的优势同时执行多个计算任务。每个进程有自己独立的内存管 理所以不同进程之间要进行数据通信比较麻烦。多线程是在一个 cpu 上创建多个子任务当某一个子任务休息的时候其他任务接着执行。多线程 的控制是由 python 自己控制的。线程存在数据同步问题所以要有锁机制。协程的实现是在一个线程内实现的相当于流水线作业。由于线程切换的消耗比较大所以对于 并发编程可以优先使用协程。协程作为一种轻量级的并发编程模型可以被视为用户态的“轻量级线程”。 与传统线程相比协 程的核心优势在于其调度完全由用户空间掌控避免了操作系统内核的频繁介入从而显著降低了上 下文切换的开销。 在诸如网络数据刷新、资源加载、用户界面更新、以及 I/O 读写等场景下如果并 发任务的计算量相对较小、对系统资源占用较低则不必动用操作系统级别的线程。协程的切换则由程序员和编程语言控制程序员决定在何时暂停或恢复协程。协程是一个特殊的函 数它可以在执行过程中暂停并在稍后恢复执行。它用 async def 定义并在需要暂停的地方使 用 await 。在我们的例子里 boil_water_async 和 send_message_async 就是两个协程。# 异步 IO import asyncio # 协程 async def boil_water_async(): print(开始烧水...) await asyncio.sleep(5) # 关键 await表示等待这个操作完成但期间可以做别的事 print(烧水完成...) # 协程 async def send_message_async(): print(开始发消息...) await asyncio.sleep(2) # 模拟烧水2s print(发消息完成...)什么是事件循环事件循环是 asyncioPython 标准库中的模块用于编写异步 I/O 操作的代码的核心你可以把它 想象成一个总调度员或一个高效的待办事项 (To-Do List) 管理员。它的工作流程非常简单它维护着一个任务列表比如煮水、发短信。它不断地循环检查每个任务如果任务处于 “等待I/O” 状态比如等水开、等网络响应就暂停它立即去执行下一个 已经 “就绪” 的任务。如果任务的等待时间到了或者 I/O 操作完成了事件循环就恢复执行这个任务。如何运行# 协程调度 # 事件循环 async def main(): # 1、烧水任务 task1 asyncio.create_task(boil_water_async()) # 2、发消息任务 task2 asyncio.create_task(send_message_async()) await task1 await task2 # 5s # run 会创建一个事件循环 asyncio.run(main())输出结果开始煮水... # 任务1开始 开始发短信... # 任务1遇到await立即让出控制权事件循环马上启动任务2 此时两个任务都在后台“等待” 等待约2秒后... 短信发送成功 # 任务2的等待时间先到任务2完成 继续等待约3秒后... 水开了 # 任务1的等待时间也到了任务1完成总耗时5秒 (因为两个任务的等待时间是并发的)通过使用 asyncio 我们可以在单线程中同时处理多个任务。一个在单线程内调度和管理所有协程 的核心机制就是事件循环。它不停地检查哪些协程可以执行哪些在等待。总结一下协程是 asyncio 的核心概念之一。它是一个特殊的函数可以在执行过程中暂停并在稍后恢 复执行。协程通过 async def 关键字定义并通过 await 关键字暂停执行等待异步操作完 成。要运行一个协程可以使用 asyncio.run() 函数。它会创建一个事件循环并运行指定的协 程。事件循环是 asyncio 的核心组件负责调度和执行协程。它不断地检查是否有任务需要执 行并在任务完成后调用相应的回调函数。使用可以使用 .astream() 方法来异步生成流式响应的效果这专为非阻塞工作流程而设计。可以在 异步代码中使用它来实现相同的实时流式处理行为。代码如下# 异步流式输出 async def async_stream(): print(异步调用) async for chunk in model.astream(写一段关于春天的作文1000字): print(chunk.content, end|, flushTrue) asyncio.run(async_stream())使用 StrOutputParser 解析模型的输出还记得最早我们讲过 Runnable 接口Runnable 接口聊天模型、输出解析器等组件都实现了 LangChain 的 Runnable 接口他们都是 Runnable 接 口的实例。Runnable 定义了一个标准接口允许 Runnable 组件Invoked调用: 单个输入转换为输出。Batched批处理: 多个输入被有效地转换为输出。Streamed流式传输: 输出在生成时进行流式传输。Inspected检查: 可以访问有关 Runnable 的输入、输出和配置的原理图信息。Composed组合: 可以组合多个 Runnable以使用 LCEL 协同工作以创建复杂的管道。......可以看到流式传输实际上并不算是聊天模型定义的能力而是只要实现了Runnable 接口的实例都具 备的能力但要注意并非所有组件都必须实现流式处理在某些情况下流式处理要么是不必要的要么很困 难要么根本没有意义。例如以后我们会讲解的 Retrievers检索器 就不提供任何流式处理。那么再得出一个关于流式传输的结论 .stream() 和 .astream() 方法产生的块类型取决于正在 流式传输的组件。例如我们当前正在使用聊天模型的流式传输返回的每个块都将是一个 AIMessageChunk 。但是对于其他组件块类型可能不同。接下来让我们使用 LCEL 构建一个简单的链该链结合了 模型 和 解析器 并验证流是否有效。不要 忘了使用 LCEL 创建的链也实现了 Runnable 接口。我们将使用 StrOutputParser 来解析模型的输出它从 AIMessageChunk 中提取内容字段为 我们提供模型返回的令牌 。代码如下from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser # 定义大模型 model ChatOpenAI(modelgpt-4o-mini) # 定义输出解析器 parser StrOutputParser() # 定义链 chain model | parser for chunk in chain.stream(写一段关于爱情的歌词需要5句话): print(chunk, end|, flushTrue)结果:|在|星|空|下|许|下|心|愿|| |你的|笑|容|如|晨|光|温|暖|| |手|握|手|走|过|每|段|光|阴|| |无|论|风|雨|依|然|不|离|不|弃|| |爱|是|永|恒||心|与|心|相|连|。| ||自定义流式输出解析器上面我们演示了如何让聊天模型进行流式输出。若此时我们希望修改上一步的输出样式一个字或两 个字的输出将输出改为一句话一句话的输出同时保留流式处理功能。那么我们需要在链中使用 生成器函数即可完成自定义流式输出的能力。还记得之前说过聊天模型的 .stream() 方法返回的是一个迭代器该迭代器在生成输出时同步产 生输出 消息块 。那么我们的将实现的这些生成器的签名应该是 Iterator[Input] - Iterator[Output] 。或者对于异步生成器 AsyncIterator[Input] - AsyncIterator[Output] 。下面是句号分隔列表的自定义输出解析器的示例# 组件1聊天模型 model ChatOpenAI(modelgpt-4o-mini) # 组件2输出解析器str parser StrOutputParser() # 自定义生成器 def split_into_list(input: Iterator[str]) - Iterator[List[str]]: buffer for chunk in input: buffer chunk # 遇到 。 需要刷新 while 。 in buffer: # 找到 。的位置 stop_index buffer.index(。) # yield 用于创造生成器。将句号之前的内容去除首尾空格作为一个句子放入列表中并产出 yield [buffer[:stop_index].strip()] # 更新缓冲区保留句号之后的内容 buffer buffer[stop_index 1 :] # 处理buffer最后几个字 yield [buffer.strip()] # 定义链 chain model | parser | split_into_list # 返回一个迭代器产生的消息块 for chunk in chain.stream(写一段关于爱情的歌词需要5句话每句话用中文句号隔开。): print(chunk, end|, flushTrue)深度探索流式传输SSE 协议介绍HTTP 协议本身设计为无状态的请求-响应模式严格来说是无法做到服务器主动推送消息到客户 端但通过 Server-Sent Events 服务器发送事件简称 SSE技术可实现流式传输允许服 务器主动向浏览器推送数据流。也就是说服务器向客户端声明接下来要发送的是流消息(streaming)这时客户端不会关闭连接 会一直等待服务器发送过来新的数据流。SSEServer-Sent Events是一种基于 HTTP 的轻量级实时通信协议浏览器可以通过内置的 EventSource API 接收并处理这些实时事件。核心特点基于 HTTP 协议复用标准 HTTP/HTTPS 协议无需额外端口或协议兼容性好且易于部署。单向通信机制SSE 仅支持服务器向客户端的单向数据推送客户端通过普通 HTTP 请求建立连接后服务器可持续 发送数据流但客户端无法通过同一连接向服务器发送数据。自动重连机制支持断线重连连接中断时浏览器会自动尝试重新连接支持 retry 字段指定重连间隔自定义消息类型客户端发起请求后服务器保持连接开放响应头设置 Content-Type: text/eventstream 标识为事件流格式持续推送事件流。数据格式服务端向浏览器发送 SSE 数据需要设置必要的 HTTP 头信息Content-Type: text/event-stream;charsetutf-8Connection: keep-alive每一次发送的消息由若干个 message 组成每个 message 之间由 \n\n 分隔每个 message 内 部由若干行组成每一行都是如下格式[field]: value\nField 可以取值为:data[必需]数据内容event[非必需]表示自定义的事件类型默认是message事件id[非必需]数据标识符相当于每一条数据的编号retry[非必需]指定浏览器重新发起连接的时间间隔除此之外还可以有冒号 : 开头的行表示注释。LangChain 流式传输流程分析LangChain 本身并不“创造”或“规定”一个底层的网络传输协议而是依赖于其底层的大模型供应 商如 OpenAI和我们自身服务应用所使用的 Web 框架如 FastAPI的协议。因此对于 LangChain 的流式传输能力本身是因为大模型供应商提供了流式传输能力由 LangChain 进行调用后接收并处理成一个个的 AIMessageChunk 。通过源码分析流程接下来我们将会通过分析相关源码探索整个传输流程。整个过程我们以 OpenAI 举例其他大模型方 式类似可自行探索。当我们向 OpenAI 发起流式请求LangChain 实际上会通过 BaseChatOpenAI 类中的 _stream() 方法发起调用。下面来看下 _stream() 方法的关键流程性源码完整源码见class langchain_openai.chat_models.base.BaseChatOpenAI从上述流程看来这就是流式逐块产生 AIMessageChunk 聊天消息的核心方法。那么接下来看三个 问题发起调用时底层使用什么协议如何支持流式传输返回的块是什么格式如何转换成 AIMessageChunk LangChain 请求 OpenAI 使用什么协议回答这个问题需要看 LangChain 关于 OpenAI 的客户端是怎么定义的。让我们找到 class langchain_openai.chat_models._client_utils._SyncHttpxClientWrapper 如下 所示从上面的代码看来LangChain 使用了 OpenAI 的官方的 OpenAI SDK for Python 接入方式继承了 openai._base_client 定义了一个 HTTP 客户端。因此在调用时发起的是 HTTP 调用。LangChain 如何支持流式传输开始我们就说了LangChain 本身并不“创造”或“规定”一个底层的网络传输协议而是依赖于其 底层的大模型供应商如 OpenAI的协议。因此当我们发起请求时会在请求中设置 streamTrue _stream() 源码中的第一步表示 OpenAI 服务器将在生成 Response 时向客户端发出数据server-sent eventsSSE。此时 API 会保持 HTTP 连接打开并以特定格式发送数据。例如我们向原生的 GPT 模型发起一次设置了 streamTrue 的 HTTP 请求 你好我是张 三。 。此时我们会收到来自 OpenAI 的事件块简化后的有效负载序列看了上述示例我们应该可以回答第二个问题。那就是在请求中设置 streamTrue 开启 OpenAI 服 务端返回数据块 LangChain 通过 _stream() 方法步骤1、2完成这件事。OpenAI 返回的块是什么格式如何转换成 AIMessageChunk OpenAI 返回的数据块格式我们已经看到了将其转换为 LangChain 自定义的 AIMessageChunk 则 是通过 _convert_chunk_to_generation_chunk() 方法完成的。关键代码如下到此我们就知道了LangChain流式传输的完整流程与底层协议。总结一下langchain-openai 包通过集成 OpenAI Python SDK提供了一个 HTTP 客户端。因此支持 LangChain 向 OpenAI 的 API 发起调用请求。若希望发起流式传输请求则需在请求中加入 streamTrue 向 OpenAI 说明以 SSE 协议进行 流式返回。LangChain 接收 OpenAI 的 SSE 格式的响应并将其转换为 LangChain 自封装的消息格式如 AIMessageChunk 消息。这样就可以以统一的方式处理来自不同模型提供商OpenAI, Anthropic等的流式响应。使用 LangSmith 跟踪 LLM 应用使用 LangChain 构建的许多应用程序可能会包含多个步骤和多次的 LLM 调用。随着这些应用程序变 得越来越复杂作为开发者我们能够检查链或代理内部到底发生了什么变得至关重要。最好的方法 是使用 LangSmith。LangSmith 与框架无关它可以与 langchain 和 langgraph 一起使用也可以不使用。LangSmith 是 一个用于帮助我们构建生产级 LLM 应用程序的平台它将密切监控和评估我们的应用。LangSmith 平台地址https://smith.langchain.com/ 新用户需要注册要想让 LangSmith 跟踪 LLM 应用第一步申请 LangSmith API Key点击 Settings就会跳转 到API Keys设置页面若没有跳转可以在左侧 tab 栏中找到进入。创建完成后保存好你的 API Key。 接下来配置两个环境变量执行任意代码查看 LangSmith 平台这将在 LangSmith 的默认跟踪项目中生成调用的跟踪。点击最新一次的调用 追踪跟踪会以瀑布流形式展示调用的完整步骤以及每个步骤的详细信息和耗时。让我们能够检查内部到 底发生了什么解释RunnableSequence 可运行序列就是我们之前讲过的 链 即我们将 model_with_search.invoke() 的结果构造成 ToolMessage 当作入参传递给 structured_search_model.invoke()。说明本次调用是RunnableSequence。但不是每次都展示RunnableSequence根据实际情 况而定。ChatOpenAI 实际处理的第一步内容调用聊天模型生成结果。RunnableLambda 实际处理的第二步内容表示将 python 可调用对象转换为 Runnable 其实就是将 AI 生成的结果转换成为结构化对象。可以看到我们在使用 LangSmith 时没有代码介入只需要配置下环境就可以直接监控我们的应用。