[拆解LangChain执行引擎]以Actor模型的视角来看Pregel 一、Agent、StateGraph和Pregel的关系可能很多天天使用LangChain编写Agent的人却没有接触过Pregel但为什么说Pregel却是整个LangChain整个体系最为核心的和部分呢接下来我们通过如下的演示程序来说明Agent、StateGraph和Pregel三者之间的关系。这是一个由两个Node组成的帮助我们生成笑话的Agent我们在调用Agent的时候指定笑话的主题第一个Node会利用LLM生成一个基于该主题的笑话。接下来会评估生成的这个笑话根据是否可笑返回 “good” 和 “bad” 如果评估失败会调用另一个Node生成一个具有更高质量的笑话。JokeAgentState是承载了贯穿整个 “笑话生产线” 的状态包括待产笑话的主题、第一个生成笑话、评估的结果和生成的第二个笑话。两个生产笑话的Nodegenerate_joke和 regenerate_joke都使用同一个基于gpt-5.2-chat的模型。我们不直接创建Agent而是利用StateGraph以 “Builder模式” 创建它。from langchain_openai import ChatOpenAI from typing import TypedDict, Literal from langgraph.graph import StateGraph, START, END from langgraph.pregel import Pregel class JokeAgentState(TypedDict): topic: str review: Literal[good, bad] init_joke: int improved_joke: str model ChatOpenAI( modelgpt-5.2-chat, base_urlURL for your own deployed model, api_keyyour own api key ) def generate_joke(state: JokeAgentState): 调用LLM生成一个关于指定主题的笑话。 result model.invoke(f写一个关于{state[topic]}的笑话要求在50字以内) return {init_joke: result.content} def regenerate_joke(state: JokeAgentState): 原笑话不好笑重新生成一个笑话。 result model.invoke(f之前生成的笑话没意思请重新一个{state[topic]}的笑话原笑话是{state[init_joke]}) return {improved_joke: result.content} builder ( StateGraph(JokeAgentState) .add_node(generate_joke, generate_joke) .add_node(regenerate_joke, regenerate_joke) ) builder.add_edge(START, generate_joke) builder.add_edge(regenerate_joke, END) builder.add_conditional_edges( generate_joke, lambda _: bad, {good: END, bad: regenerate_joke} ) agent: Pregel builder.compile() result: JokeAgentState agent.invoke({topic: 猫}) print(result[init_joke]) print(result[improved_joke])具体来说我们针对状态 类型JokeAgentState创建出StateGraph对象并通过调用add_node方法将两个函数转换成对应的Node进行注册。顾名思义StateGraph体现为一个由Node和Edge组成的状态图接下来我们调用add_edge添加了两个“确定”的Edge即开始节点到“generate_joke”和“regenerate_joke”到终止节点的Edge。由于执行“generate_joke”后是直接结束还是继续执行“regenerate_joke”依赖于评估的结果所以它们之间涉及一组“Conditional Edge”,我们通过调用add_conditional_edges方法来添加它们。为了走完全程我们是使模拟评估的Lambda表达式直接返回“bad”。最终的StateGraph对应的状态图具有如下的结构实际上下面这张图就是由StateGraph对象直接生成的由此可见基于StateGraph的编程完全可以 “按图施工” 还可以最终生成体现施工成果的图。代码和图具有完美的一致性图即代码代码即图。StateGraph不是Agent仅仅是构建Agent的Builder所以我们调用StateGraph对象的compile方法将Agent “编译” 出来。然后我们以字典形式指定笑话的主题 “猫” 调用Agent对象的invoke方法开启这个笑话生产线得到的结果就是作为全局状态的JokeAgentState对象我们将两次生成的笑话打印出来就是如下所示的结果。我家猫减肥失败因为每次跑步都追着激光笔吃夜宵。 好那我换一个更有梗的 我家猫最近开始冥想 每天对着墙坐半小时。 我以为它在修行 结果它只是在等墙里的“看不见的小虫”出来。从编译StateGraph的那句代码可以看出作为编译结果的Agent就是一个Pregel对象后者代表了Agent这个“执行体”所以我们才说Pregel是整个LangChain的执行引擎。StateGraph是一张图Graph专指图论语境中的“图”但是Pregel却是一个Actor模型。我们采用图的方式来构建Agent以为因为图是更易于 “人脑” 理解的形式这是使的Agent编程变得很简单我们将它编译成基于Actor模型的执行体是因为这是 “电脑” 更易于处理的形式使得执行起来更加快捷可靠。从这个意义上将StateGraph和Pregel都是Agent它们是同一事物在不同视角的表现形式。也可以这样看StateGraph对应的图是Agent的概念视图而Pregel对应的Actor模型是Agent的执行视图。虽然单纯的编程工作确实可以不用过多地涉及Pregel但是如果想对LangChain这一平台具有深刻地认知我觉得彻底地搞清楚Pregel是很有意义的。目前也没有多少系统介绍Pregel的资料我希望这个系列的文章能够弥补这一空白。二、 基于Pub/Sub的驱动执行有的Channel用于存储业务数据包括作为Node的输入和执行结果有的用于存储Node执行的驱动信号有的Channel则兼具这两种职责。在表示Node的PregelNode类中它的channels字段表示提供输入的Channel列表triggers字段则提供当前Node触发器的Channel列表在此列表中的任一Channel具有变更都会触发当前Node的执行。Node绑定的操作体现在其bound字段返回的Runnable[Any, Any]对象这是一个可执行的对象两个泛型参数表示输入和输出。LangChain中的Runnable是一个极其重要的类型因为LangChain的“Chain”就是由一系列Runnable对象按照响应顺序构建的“链”。class PregelNode: channels : str | list[str] triggers : list[str] bound : Runnable[Any, Any] …在如下这个演示实例中我们定义了一个由单一Node和两个Channel构建的Pregel对象。如代码片段所示我们创建了一个PregelNode对象并将其channels和triggers字段设置为“input”意味着以此命名的Channel的变更将会触发节点的执行并为其提供输入数据。我们将它的bound字段指定为一个RunnableLambda对象上它承载的操作体现在指定的Lambda表达式上它没有过的的操作直接返回原始输入。from langgraph.channels import LastValue from langgraph.pregel import Pregel from langgraph.pregel import Pregel from langgraph.channels import LastValue from langgraph.pregel._read import PregelNode from langchain_core.runnables import RunnableLambda from langgraph.pregel._write import ChannelWrite, ChannelWriteEntry node PregelNode( channelsinput, triggers[input], boundRunnableLambda(lambda args: args)) channelWrite:ChannelWrite ChannelWrite(writes[ChannelWriteEntry(channeloutput)]) node.writers.append(channelWrite) app Pregel( nodes{body: node}, channels{input: LastValue(str), output: LastValue(str)}, input_channels[input], output_channels[output], ) result app.invoke(input{input: foobar}) assert result {output: foobar}bound字段绑定的Runnable对象的执行结果需要利用writer字段存储的一个或者多个“写入器”输出到对应的Channel。在这里我们使用的写入器是一个ChannelWrite对象针对目标Channel命名为“output”的写入意图以ChannelWriteEntry对象的形式定义并添加到ChannelWrite的writes列表中。我们创建了一个承载整个执行流程的Pregel对象以字典的形式将Node和Channel分别注册到它的nodes和channels字段中对应的Key作为Node/Channel的名称。针对不同的数据消费以及针对Node的不同触发行为的需求LangGraph.Pregel为我们定义了一系列不同类型的Channel我们将在后续部分对它们作详细介绍。这里我们指定的Channel类型为LastValue作为构造函数参数指定的类型str为Channel存储的数据类型。我们分别将命名为“input”和“output”写入Pregel对象的input_channels和input_channels将其注册为输入Channel和输出Channel。我们直接调用Pregel对象的invoke方法启动执行流程输入参数input以字典的形式初始化每个输入Channel此时只有唯一的命名为“input”的Channel。此Channel的写入将自动触发Node的执行它会读取此通道的值“foobar”执行完成之后将执行结果“foobar”写入命名为“output”的Channel。整个流程执行完毕所有注册的输出Channel的值将被读取出来并以字典的形式作为invoke方法的返回值对应的Key就是输出Channel的名称所以断言揭示了执行的结果为{output: foobar}。三、利用NodeBuilder构建PregelNode我上面的例子直接调用构造函数创建作为Node的PregelNode对象。PregelNode类的包名为“langgraph.pregel._read”为了初始化其writers字段我们还需要从“langgraph.pregel._write”中导入ChannelWrite和ChannelWriteEntry类从包的命名就可以看出这并不是典型的编程方式。在绝大部分情况下我们会利用NoteBuilder采用“Builder模式”来初始化Node。利用NodeBuilder将会是我们的代码变得格外简单from langgraph.channels import LastValue from langgraph.pregel import Pregel, NodeBuilder node (NodeBuilder() .subscribe_only(input) .do(lambda args: args) .write_to(output) .build()) app Pregel( nodes{body: node}, channels{input: LastValue(str), output: LastValue(str)}, input_channels[input], output_channels[output], ) result app.invoke(input{input: foobar}) assert result {output: foobar}这是我们利用NodeBuilder对演示程序进行改写的结果。入代码片段所示在创建了NoteBuilder对象之后链式调用了它的subscribe_only方法将其唯一的输入和触发Channel名称设置为“input”。do方法会为利用指定的Lambda表达式创建对应的RunnableLambda对象并设置为PregelNode的bound字段。write_to方法会为我们创建并注册相应的ChannelWrite来将执行结果写入指定的输出Channel。PregelNode对象通过NodeBuilder的build方法构建而成然后按照前面例子一样的方式利用PregelNode对象构建Pregel对象。实际上这里还可以直接按照如下的方式传入NodeBuilder对象Pregel对象在初始化的时候会自动完成Node构建的任务。node (NodeBuilder() .subscribe_only(input) .do(lambda args: args) .write_to(output)) app Pregel( nodes{body: node}, channels{input: LastValue(str), output: LastValue(str)}, input_channels[input], output_channels[output], )四、多Channel的读写上面演示的Node都是针对单一Channel读写数据多Channel的读写可以通过不同的编程模式来实现。在如下这段演示程序中我们为构建的Pregel提供了四个分别命名为“foo”、“bar”、“baz”和“qux”的Channel前两个作为输入后两个作为输出。Node的处理函数handle具有一个字典类型的参数它携带了从所有输入Channel读取的数据Key为Channel名称。from langgraph.channels import LastValue from langgraph.pregel import Pregel, NodeBuilder from typing import Any def handle(state:dict[str,Any])-dict[str,Any]: foo: str state[foo] bar: str state[bar] return{baz:foo, qux:bar} node (NodeBuilder() .subscribe_to(foo, bar) .do(handle) .write_to(**bazlambda r: r[baz], quxlambda r: r[qux]**)) app Pregel( nodes{body: node}, channels{ foo: LastValue(str), bar: LastValue(str), baz: LastValue(str), qux: LastValue(str)}, input_channels[foo, bar], output_channels[baz, qux], ) result app.invoke(input{foo: abc, bar: xyz}) assert result {baz: abc, qux: xyz}我们希望Node从“foo”和“bar”Channel读取的数据分别写入“baz”和“qux”Channel为此我们让handle函数返回一个将输出Channel名称作为Key的字典。我们直接将handle函数作为do方法的参数它会基于此函数创建一个RunnableCallable对象作为构建PregelNode的bound字段的值。handle函数返回的字典将作为在write_to方法指定的两个Lambda表达式的输入指定的关键字参数名baz和qux将作为输出Channel的名称对应的值就是Lambda表达式的返回值。如果在调用write_to方法时直接指定了多个Channel“foo”和“bar” Pregel会将作为handle函数返回结果的字典作为一个整体同时写入指定的多个目标Channel这相当于在进行针对多Channel的消息广播。from langgraph.channels import LastValue from langgraph.pregel import Pregel, NodeBuilder from typing import Any def handle(state:dict[str,any])-dict[str,Any]: foo: str state[foo] bar: str state[bar] return{baz:foo, qux:bar} node (NodeBuilder() .subscribe_to(foo, bar) .do(handle) .write_to(baz, qux)) app Pregel( nodes{body: node}, channels{ foo: LastValue(str), bar: LastValue(str), baz: LastValue(str), qux: LastValue(str)}, input_channels[foo, bar], output_channels[baz, qux], ) result app.invoke({foo: abc, bar: xyz}) assert result {baz: **{baz: abc,qux:xyz}**, qux: **{baz: abc,qux:xyz}**}多Channel的写入还可以利用ChannelWriteEntry对象来完成实际上面在write_to方法中直接指定的Channel名称都会转换成ChannelWriteEntry对象它帮助我们对Channel的写入形式进行细粒度的控制。在如下的代码片段中我们为ChannelWriteEntry的mapper字段指定了一个Lambda表达式它会帮我们完成原始的执行结果与输出到Channel的真实数据之间的映射。from langgraph.pregel._write import ChannelWriteEntry … node (NodeBuilder() .subscribe_to(foo, bar) .do(handle) .write_to( **ChannelWriteEntry(channelbaz, mapperlambda r: r[baz]), ChannelWriteEntry(channelqux, mapperlambda r: r[qux]),** )) …有时我们并不希望Node的处理函数使用单一字典这种过于“笼统”的参数而是倾向于使用输入Channel的名称来命名其参数此时我们可以按照如下的方式利用“拆包”来解决这个。def handle(foo:str, bar:str)-dict[str,Any]: return{baz:foo, qux:bar} node (NodeBuilder() .subscribe_to(foo, bar) .do(lambda state: handle(******state)) .write_to( ChannelWriteEntry(channelbaz, mapperlambda s: s[baz]), ChannelWriteEntry(channelqux, mapperlambda s: s[qux]), ))五、Node之间的依赖LangChain构建的Agent本质上是一个图模型Graph 模型。我们利用模型和工具节点构建的Agent虽然看起来像是一个有始无终的有环图但是它本质上是一个StateGraph对象后者是由Node和Edge边构成的有始start有终end的图Edge体现了由它连接两个Node之间的依赖关系。StateGraph对象最终通过编译转换成Pregel对象后Edge体现的依赖关系被转换成Node基于Channel的订阅/发布关系。假设A和B是StateGraph的两个NodeA到B的StateGraph表示B必须等到A执行完成后才能执行即B依赖于A。在Pregel中并没有Edge的概念Node之间之间的依赖关系只能利用针对Channel的订阅/发布来实现。具体实现其实很简单让A在执行完成后像B订阅的Channel写入要给触发信号。在如下所示的演示程序中我们定义了一个名为“output”类型为BinaryOperatorAggregate 的Channel。顾名思义这个Channel类型利用指定的二元操作对当前值和新写入的值进行聚合最终确定最新的内容。我们利用它来存储一个字符串来表示按序执行的Node列表所以我们在创建它的时候以Lambda表达式指定了这个二元操作符后者将添加的Node名称作为后缀“{node}”附加到当前内容上。我们为创建的Pregel对象创建了三个命名为“foo”、“bar”和“baz”的Node以及对应的触发Channel也就是三个Node订阅对应的Channel。我们将“foo”Channel作为输入在调用的时候指定所以“foo”Node率先执行。每个Node执行完后这里没有执行任何具体的操作可以任务是一个空操作除了将自身的名称写入“output”Channel之外还需要写入相应的Channel驱动后续Node的执行“foo”执行完后写入“bar”“bar”执行完之后写入“baz”。from langgraph.channels import LastValue,BinaryOperatorAggregate from langgraph.pregel import Pregel, NodeBuilder foo (NodeBuilder() .subscribe_to(foo,readFalse) .write_to(outputfoo, barNone)) bar (NodeBuilder() .subscribe_to(bar,readFalse) .write_to(outputbar, bazNone)) baz (NodeBuilder() .subscribe_to(baz,readFalse) .write_to(outputbaz)) app Pregel( nodes{ foo:foo, bar:bar, baz:baz }, channels{ foo: LastValue(None), bar: LastValue(None), baz: LastValue(None), output: BinaryOperatorAggregate(str, operatorlambda a,b: f{a},{b}) }, input_channels[foo], output_channels[output] ) result app.invoke({foo: None}) assert result {output: ,foo,bar,baz}六、多Node依赖对一个Node来说只要它的triggers列表中任何一个Channel有更新它就会被驱动执行。如果采用上面例子演示的驱动方式当某个Node具有多个作为前置条件的Node任何一个作为前置条件的Node执行完后自己就会被动执行一次。也就说前置Node和它之间是一种 “ANY” 的驱动关系如何实现 “ALL” 的驱动关系呢即所有前置Node都执行完毕才满足后续Node的执行条件。以如下这张图为例开始的时候以两个分支并行执行左边分支先后执行foo和baz右边分支执行bar。我们要求baz和baz都成功执行后qux才能执行。这个问题有两种解决方案一种就是让baz和bar完成执行后利用Channel写入的方式向qux发送一个信号同时将自己已经完成的状态通过相应的Channel保存下来。这样qux会被触发两次但是它可以读取对应的Channel判断前置条件是否满足。在如下的实现中我们依然为四个Node设置了触发它们的同名Channel。由于“foo” 和 “bar” 这两个Node最开始并行执行我们将对应的通道设置为输入通道。我们为Pregel注册了另一个名为 “output” 的输出Channel由于收集成功执行的Channel。此Channel依然为BinaryOperatorAggregate但与之前不同的是其数据类型被设置为列表并利用operator.add将新的通道名称加入列表。from langgraph.channels import LastValue,BinaryOperatorAggregate from langgraph.pregel import Pregel, NodeBuilder import operator from typing import Any foo (NodeBuilder() .subscribe_to(foo,readFalse) .write_to(output[foo], bazNone)) bar (NodeBuilder() .subscribe_to(bar,readFalse) .write_to(output[bar], quxNone)) baz (NodeBuilder() .subscribe_to(baz,readFalse) .write_to(output[baz], quxNone)) def handle(args:dict[str,Any]): output:list[str] args[output] **if bar in output and baz in output: return [qux] return []** qux (NodeBuilder() .subscribe_to(qux,readFalse) .read_from(output) .do(handle) .write_to(output)) app Pregel( nodes{ foo:foo, bar:bar, baz:baz, qux:qux }, channels{ foo: LastValue(None), bar: LastValue(None), baz: LastValue(None), qux: LastValue(None), output: BinaryOperatorAggregate(list, operatoroperator.add) }, input_channels[foo,bar], output_channels[output] ) result app.invoke({foo: None, bar: None}) sequences:list[list[str]] [ [foo,baz,bar,qux], [bar,foo,baz,qux], [foo,baz,bar,qux]] assert result[output] in sequences我们依然按照上一个实例的“套路”每个Node在完成执行后都将自己的名称写入“output”通道。我们在调用Pregel的时候通过对 “foo” 和 “bar” 这两个Channel的写入驱动对应Node的执行。 “foo”、“bar” 和 “baz” 这三个Node在完成各自执行后通过写入对应的通道驱动后续Node的执行所以节点“qux”在“bar”和“baz”在执行后都会收到信号但是它可以读取“output”这个Channel的内容确定两个前置Node是否成功执行。七、更好的解决方案上面的例子采用“多次触发”并且让Node自行判断前置条件是否满足的方式解决了多Node依赖问题虽然实现起来也很简单对性能也没有太大的影响但我个人任务这是一种很Low的实现。在我看来在一个理想的系统中每个成员类型之间应该有明确清晰的职责边界“母鸡不司晨”“狗也不拿耗子”。Pregel中就Node和Channel两中核心角色作为功能组件的Node唯一需要关注的就是业务逻辑执行条件的判断是触发时机的问题应该由Channel负责。为了解决针对Node精准的触发和调度是Pregel设计众多Channel类型的一个重要因素。在众多预定义的Channel中用一个名为NamedBarrierValue的Channel就是为这种应用场景服务的。该Channel预定义一组名称集合其自身会维护一个集合来存储写入的值只有两个集合一致性的时候它才会视为有更新并触发订阅它的Node。为了解决“qux” 针对 “bar” 和 “baz” 的依赖我们只需要按照如下的方式将 “qux” 这个通道定义成NamedBarrierValue类型并将作为名称集合的names设置为它依赖的两个Node{bar”, baz}。 “bar” 和“baz” 在完成执行后只需要将各自的名称写入此Channel即可。from langgraph.channels import LastValue, BinaryOperatorAggregate,NamedBarrierValue from langgraph.pregel import Pregel, NodeBuilder import operator foo (NodeBuilder() .subscribe_to(foo,readFalse) .write_to(output[foo], bazNone)) bar (NodeBuilder() .subscribe_to(bar,readFalse) .write_to(output[bar], quxbar)) baz (NodeBuilder() .subscribe_to(baz,readFalse) .write_to(output[baz], quxbaz)) qux (NodeBuilder() .subscribe_to(qux,readFalse) .write_to(output[qux])) app Pregel( nodes{ foo:foo, bar:bar, baz:baz, qux:qux }, channels{ foo: LastValue(None), bar: LastValue(None), baz: LastValue(None), qux: **NamedBarrierValue(str, names{bar,baz}),** output: BinaryOperatorAggregate(list, operatoroperator.add) }, input_channels[foo,bar], output_channels[output] ) result app.invoke({foo: None, bar: None}) sequences:list[list[str]] [ [foo,baz,bar,qux], [bar,foo,baz,qux], [foo,baz,bar,qux]] assert result[output] in sequences