目录1. 提示链模式 (Prompt Chaining)1.1 概念1.2 模式实践2. 并行化模式 (Parallelization)2.1 概念2.2 模式实践3. 路由模式 (Routing)3.1 概念3.2 模式实践4. 协调者-工作者模式 (Orchestrator-Workers)4.1 概念4.2 模式实践5. 评估器-优化器模式 (Evaluator-optimizer)5.1 概念5.2 模式实践工作流模式就是预先定义好的执行路径像工厂流水线一样每个步骤有明确的输入输出和顺序。根据不同场景可以定制出不同的工作流用法。1. 提示链模式 (Prompt Chaining)1.1 概念提示链就像流水线前一个步骤的输出作为下一个步骤的输入。比如写文章时需要 大纲 → 初稿 → 润色 → 最终稿每个步骤的输出传给下一个步骤内容质量才能逐步提升。START → 输入主题 → 大纲生成 → 大纲 → 初稿生成 → 初稿 → 润色文章 → 润色版 → 最终稿生成 → 终稿 → END1.2 模式实践我们创建一个内容创作场景的工作流节点设计为generate_outline负责大纲生成generate_draft负责初稿写作polish_content负责内容润色finalize_content负责最终整合from langchain.chat_models import init_chat_model from langchain_core.messages import HumanMessage from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END model init_chat_model(gpt-4o-mini) # 输入模式 - 只包含用户输入的主题 class InputState(TypedDict): topic: str # 输出模式 - 只包含最终生成的内容 class OutputState(TypedDict): final_content: str # 完整状态模式内部使用继承输入输出加上中间步骤的字段 class OverallState(InputState, OutputState): outline: str # 第一步生成的大纲 draft: str # 第二步生成的初稿 polished_draft: str # 第三步润色后的稿件 # 第一步生成大纲 PROMPT_1 ( 根据主题生成文章大纲。\n 主题{topic}\n 要求\n 1.只需两个最核心标题\n 2.不用进行说明只返回最终大纲 ) def generate_outline(state: InputState): 根据主题生成内容大纲 print(内容大纲生成中...) prompt PROMPT_1.format(topicstate[topic]) outline model.invoke([HumanMessage(contentprompt)]).content print(f大纲已生成\n{outline}) return {outline: outline, topic: state[topic]} # 第二步基于大纲生成初稿 PROMPT_2 ( 根据以下内容生成文章完整初稿。\n 主题{topic}\n 大纲{outline}\n 要求\n 1.每个标题下最多使用三句话的内容即可\n 2.不用进行说明只返回最终结果 ) def generate_draft(state: OverallState): 根据大纲生成完整初稿 print(生成初稿中...) prompt PROMPT_2.format(topicstate[topic], outlinestate[outline]) draft model.invoke([HumanMessage(contentprompt)]).content print(f初稿已生成\n{draft}) return {draft: draft} # 第三步润色稿件 PROMPT_3 ( 根据文章初稿进行润色。\n 主题{topic}\n 初稿{draft}\n 要求\n 1.润色后文章不能太长 ) def polish_content(state: OverallState): 对初稿进行润色优化 print(文章润色中...) prompt PROMPT_3.format(topicstate[topic], draftstate[draft]) polished model.invoke([HumanMessage(contentprompt)]).content print(f润色完成内容如下\n{polished}) return {polished_draft: polished} # 第四步生成最终稿 PROMPT_4 ( 根据润色版文章生成文章终稿。\n 主题{topic}\n 大纲{outline}\n 润色版文章{polished_draft}\n ) def finalize_content(state: OverallState): 生成最终版本的内容 prompt PROMPT_4.format(topicstate[topic], outlinestate[outline], polished_draftstate[polished_draft]) final_content model.invoke([HumanMessage(contentprompt)]).content return {final_content: final_content} # 构建工作流 builder StateGraph(OverallState, input_schemaInputState, output_schemaOutputState) # 添加节点 builder.add_node(generate_outline) # 生成大纲 builder.add_node(generate_draft) # 生成初稿 builder.add_node(polish_content) # 润色稿件 builder.add_node(finalize_content) # 生成最终稿 # 连接节点直线流程 builder.add_edge(START, generate_outline) # 开始 → 生成大纲 builder.add_edge(generate_outline, generate_draft) # 大纲 → 生成初稿 builder.add_edge(generate_draft, polish_content) # 初稿 → 润色 builder.add_edge(polish_content, finalize_content) # 润色 → 最终稿 builder.add_edge(finalize_content, END) # 最终稿 → 结束 # 编译工作流 chain builder.compile() # 使用工作流 result chain.invoke({topic: 人工智能的未来发展}) print(最终创作结果:) print(result[final_content])2. 并行化模式 (Parallelization)2.1 概念并行化就是多个任务同时进行提高效率最终汇总结果。多角度处理同一问题时常用。比如要研发一款智能电动自行车需要做多维度分析市场分析用户关注续航、重量、防盗对骑行社交有兴趣竞品分析传统品牌智能化不足互联网品牌续航和售后差技术分析轻量化电池、GPS防盗、社交App集成最终汇总分析结果并行不仅省时还能提升决策质量。2.2 模式实践from typing import TypedDict from langgraph.constants import START, END from langgraph.graph import StateGraph class AnalysisState(TypedDict): concept: str # 产品概念 market: str # 市场分析结果 competitor: str # 竞品分析结果 tech: str # 技术分析结果 report: str # 汇总报告 # 三个并行分析任务 def market_task(state: AnalysisState): 市场分析节点 return {market: 用户关注续航、重量、防盗对骑行社交有兴趣...} def competitor_task(state: AnalysisState): 竞品分析节点 return {competitor: 传统品牌智能化不足互联网品牌续航和售后差...} def tech_task(state: AnalysisState): 技术分析节点 return {tech: 轻量化电池车身、GPS防盗、社交App集成...} # 汇总结果节点 def combine_results(state: AnalysisState): 把三个分析结果合并成一份报告 report 产品分析报告\n\n report f市场分析\n{state[market]}\n\n report f竞品分析\n{state[competitor]}\n\n report f技术分析\n{state[tech]}\n\n report 建议聚焦续航、防盗、社交功能的平衡发展 return {report: report} # 构建工作流 builder StateGraph(AnalysisState) builder.add_node(market, market_task) builder.add_node(competitor, competitor_task) builder.add_node(tech, tech_task) builder.add_node(combine, combine_results) # 并行执行三个分析从START同时连到三个节点 builder.add_edge(START, market) builder.add_edge(START, competitor) builder.add_edge(START, tech) # 三个分析都完成后汇总 builder.add_edge(market, combine) builder.add_edge(competitor, combine) builder.add_edge(tech, combine) builder.add_edge(combine, END) workflow builder.compile() # 使用 result workflow.invoke({concept: 城市通勤智能电动自行车}) print(result[report])3. 路由模式 (Routing)3.1 概念路由模式也叫智能分流根据输入内容决定走哪个分支。最典型的就是智能客服系统自动把用户问题分类处理。3.2 模式实践实现一个智能客服系统根据用户问题自动分类达到精准匹配处理能力。核心设计在于条件路由机 制• 动态路径选择可以基于 LLM 分析结果动态决定执行路径结构化返回• 分支隔离不同类型的问题由专用处理器处理核心设计在于条件路由机制动态路径选择 分支隔离。节点设计为• model_call_router 节点: 路由决策节点根据用户问题由 LLM 通过结构化返回进行智能决 策。• pre_sale_handler 节点: 处理售前咨询• after_sale_handler 节点: 处理售后问题• technical_handler 节点: 处理技术问题from langchain.chat_models import init_chat_model from langgraph.constants import START, END from langgraph.graph import StateGraph from typing_extensions import Literal, TypedDict from pydantic import BaseModel, Field class State(TypedDict): input: str # 用户输入的问题 decision: str # 路由决策结果 output: str # 最终输出 # 用 Pydantic 定义路由决策的数据结构LLM 会按这个结构返回 class Route(BaseModel): step: Literal[pre_sale, after_sale, technical] Field( description根据用户问题类型决定路由到售前、售后还是技术处理 ) # 路由决策节点 - 调用LLM分析用户输入决定问题类型 def model_call_router(state: State): 分析用户输入决定问题类型 model init_chat_model(gpt-4o-mini) decision model.with_structured_output(Route).invoke(state[input]) return {decision: decision.step} # 三个不同的处理节点 def pre_sale_handler(state: State): 处理售前咨询 return {output: 售前咨询已处理处理内容......} def after_sale_handler(state: State): 处理售后问题 return {output: 售后问题已处理处理内容......} def technical_handler(state: State): 处理技术问题 return {output: 技术问题已处理处理内容......} # 路由函数 - 根据决策结果返回下一个要执行的节点名 def route_decision(state: State): if state[decision] pre_sale: return pre_sale_handler elif state[decision] after_sale: return after_sale_handler elif state[decision] technical: return technical_handler # 构建路由工作流 router_builder StateGraph(State) # 添加所有节点 router_builder.add_node(pre_sale_handler) router_builder.add_node(after_sale_handler) router_builder.add_node(technical_handler) router_builder.add_node(model_call_router) # 先经过路由决策 router_builder.add_edge(START, model_call_router) # 条件边根据路由结果选择不同的分支 router_builder.add_conditional_edges( model_call_router, route_decision, [pre_sale_handler, after_sale_handler, technical_handler] ) # 所有分支最终都结束 router_builder.add_edge(pre_sale_handler, END) router_builder.add_edge(after_sale_handler, END) router_builder.add_edge(technical_handler, END) router_workflow router_builder.compile() # 测试用例 test_cases [ 我想了解一下你们产品的价格和功能, # 售前咨询 我购买的产品有质量问题需要退货, # 售后问题 这个软件安装后无法正常运行报错代码0x80070005, # 技术问题 请问你们的售后服务政策是什么, # 售前咨询 我的订单已经发货但还没收到, # 售后问题 如何配置数据库连接参数 # 技术问题 ] for test_case in test_cases: print(- * 50) result router_workflow.invoke({input: test_case}) print(f用户问题{test_case}\n{result[output]})4. 协调者-工作者模式 (Orchestrator-Workers)4.1 概念协调者-工作者模式就是一个大脑协调者分配任务多个工人工作者执行最后合成最终结果。比如处理几百页技术文档协调者拆分文档安排多个工作者并行处理不同章节最终汇总。和并行化模式的核心区别在于任务分配方式并行化任务在设计时就确定所有任务同时开始协调者-工作者任务在运行时由协调者动态分配举个例子文档翻译并行化是已知要翻译3个固定章节同时翻译协调者-工作者是先分析文档结构发现需要翻译5个章节再动态分配数据分析并行化是同时计算平均值、最大值、最小值固定指标协调者-工作者是先分析数据特征决定需要计算哪些统计指标4.2 模式实践我们要实现协调者根据topic生成报告大纲再把子任务指派给工作者工作者生成大纲对应的内容协调者生成3个标题就需要3个工作者10个就需要10个合成器汇总所有工作者的成果关键是任务指派协调者在运行时动态分配工作者边的数量在运行时才能确定。LangGraph 支持从条件边返回Send对象来实现这个。Send有两个参数第一个是节点名称第二个是传递给该节点的状态。from langchain.chat_models import init_chat_model from langgraph.constants import START, END from langgraph.graph import StateGraph from langgraph.types import Send # 用于动态分配任务给工作者 from typing import Annotated, TypedDict, List import operator from pydantic import BaseModel class State(TypedDict): topic: str sections: list # 协调者生成的计划 completed_sections: Annotated[list, operator.add] # 工作者完成的结果用 operator.add 自动合并 final_report: str # 定义数据结构 - 结构化输出 class Section(BaseModel): name: str description: str class Sections(BaseModel): sections: List[Section] # 创建规划器 - 用结构化输出让LLM返回固定格式 model init_chat_model(gpt-4o-mini) planner model.with_structured_output(Sections) # 协调者节点 - 分析任务并制定执行计划 def orchestrator(state: State): 协调者制定报告大纲 report_sections planner.invoke( f为主题{state[topic]}制定报告大纲包含3个章节 ) return {sections: report_sections.sections} # 工作者节点 - 根据分配的任务生成具体内容 def llm_call(state: State): 工作者编写具体的报告章节 section state[section] # 从协调者接收的任务 result model.invoke( f编写报告章节{section.name}内容要求{section.description} ) return {completed_sections: [result.content]} # 返回列表operator.add 会自动合并 # 汇总节点 - 把所有工作者的成果拼接起来 def synthesizer(state: State): 汇总所有工作者的成果 completed_sections state[completed_sections] final_report \n\n---\n\n.join(completed_sections) return {final_report: final_report} # 任务分配函数 - 这是关键部分 # 返回一个 Send 列表每个 Send 会动态创建一个工作者任务 def assign_workers(state: State): 为每个章节创建一个工作者任务 worker_tasks [] for section in state[sections]: worker_tasks.append( Send(llm_call, {section: section}) # Send(节点名, 传给节点的状态) ) return worker_tasks # 构建工作流 builder StateGraph(State) builder.add_node(orchestrator, orchestrator) builder.add_node(llm_call, llm_call) builder.add_node(synthesizer, synthesizer) builder.add_edge(START, orchestrator) # 关键协调者后用条件边动态创建多个工作者 builder.add_conditional_edges( orchestrator, assign_workers, [llm_call] # 所有动态创建的工作者都指向 llm_call 节点 ) # 所有工作者完成后汇总 builder.add_edge(llm_call, synthesizer) builder.add_edge(synthesizer, END) worker builder.compile() response worker.invoke({topic: 中国近代史}) print(response)运行结果{ topic: 中国近代史, sections: [ Section(name第一章近代史概述, description介绍中国近代史的背景与重要性涵盖社会、经济、文化等多个方面的变化。), Section(name第二章鸦片战争与列强侵华, description探讨鸦片战争对中国的影响以及随后的列强侵华与民族觉醒进程。), Section(name第三章辛亥革命与新文化运动, description分析辛亥革命及其后果以及新文化运动对中国现代化的推动作用。) ], completed_sections: [# 第一章..., # 第二章..., # 第三章...], final_report: # 第一章...# 第二章...# 第三章... }5. 评估器-优化器模式 (Evaluator-optimizer)5.1 概念评估器-优化器模式的流程是先生成内容 → 再评估质量 → 如果需要改进就重新生成不断循环直到满足质量标准。经典场景内容质量优化、代码生成和改进。5.2 模式实践核心在于质量检测。这个模式在前面案例3智能文档问答系统中已经实现过了这里简单回顾下核心逻辑from langchain.chat_models import init_chat_model from langchain_core.messages import HumanMessage, AIMessage from typing_extensions import TypedDict, Annotated from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_tavily import TavilySearch # 定义消息状态 class MessagesState(TypedDict): messages: Annotated[list, add_messages] # 准备工具和模型 search TavilySearch(max_results4) tools [search] model init_chat_model(gpt-4o-mini, temperature0) model_with_tools model.bind_tools(tools) # 评估检索结果质量的节点 def grade_documents(state: MessagesState): 评估检索到的文档质量决定是直接生成答案还是重写问题 messages state[messages] last_message messages[-1] # 这里简化了评估逻辑实际应该用LLM来评估文档相关性 if hasattr(last_message, content) and len(last_message.content) 10: score yes # 文档质量ok else: score no # 文档质量不够需要重写问题 if score yes: return generate_answer # 质量达标去生成答案 else: return rewrite_question # 质量不够去优化问题重新检索值得注意的是除了可以让LLM当作评估器LangGraph还支持人机交互式的评估即将质量评估由人工来完成。
十、LangGraph能力详解:工作流的常见模式
发布时间:2026/6/8 2:43:59
目录1. 提示链模式 (Prompt Chaining)1.1 概念1.2 模式实践2. 并行化模式 (Parallelization)2.1 概念2.2 模式实践3. 路由模式 (Routing)3.1 概念3.2 模式实践4. 协调者-工作者模式 (Orchestrator-Workers)4.1 概念4.2 模式实践5. 评估器-优化器模式 (Evaluator-optimizer)5.1 概念5.2 模式实践工作流模式就是预先定义好的执行路径像工厂流水线一样每个步骤有明确的输入输出和顺序。根据不同场景可以定制出不同的工作流用法。1. 提示链模式 (Prompt Chaining)1.1 概念提示链就像流水线前一个步骤的输出作为下一个步骤的输入。比如写文章时需要 大纲 → 初稿 → 润色 → 最终稿每个步骤的输出传给下一个步骤内容质量才能逐步提升。START → 输入主题 → 大纲生成 → 大纲 → 初稿生成 → 初稿 → 润色文章 → 润色版 → 最终稿生成 → 终稿 → END1.2 模式实践我们创建一个内容创作场景的工作流节点设计为generate_outline负责大纲生成generate_draft负责初稿写作polish_content负责内容润色finalize_content负责最终整合from langchain.chat_models import init_chat_model from langchain_core.messages import HumanMessage from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END model init_chat_model(gpt-4o-mini) # 输入模式 - 只包含用户输入的主题 class InputState(TypedDict): topic: str # 输出模式 - 只包含最终生成的内容 class OutputState(TypedDict): final_content: str # 完整状态模式内部使用继承输入输出加上中间步骤的字段 class OverallState(InputState, OutputState): outline: str # 第一步生成的大纲 draft: str # 第二步生成的初稿 polished_draft: str # 第三步润色后的稿件 # 第一步生成大纲 PROMPT_1 ( 根据主题生成文章大纲。\n 主题{topic}\n 要求\n 1.只需两个最核心标题\n 2.不用进行说明只返回最终大纲 ) def generate_outline(state: InputState): 根据主题生成内容大纲 print(内容大纲生成中...) prompt PROMPT_1.format(topicstate[topic]) outline model.invoke([HumanMessage(contentprompt)]).content print(f大纲已生成\n{outline}) return {outline: outline, topic: state[topic]} # 第二步基于大纲生成初稿 PROMPT_2 ( 根据以下内容生成文章完整初稿。\n 主题{topic}\n 大纲{outline}\n 要求\n 1.每个标题下最多使用三句话的内容即可\n 2.不用进行说明只返回最终结果 ) def generate_draft(state: OverallState): 根据大纲生成完整初稿 print(生成初稿中...) prompt PROMPT_2.format(topicstate[topic], outlinestate[outline]) draft model.invoke([HumanMessage(contentprompt)]).content print(f初稿已生成\n{draft}) return {draft: draft} # 第三步润色稿件 PROMPT_3 ( 根据文章初稿进行润色。\n 主题{topic}\n 初稿{draft}\n 要求\n 1.润色后文章不能太长 ) def polish_content(state: OverallState): 对初稿进行润色优化 print(文章润色中...) prompt PROMPT_3.format(topicstate[topic], draftstate[draft]) polished model.invoke([HumanMessage(contentprompt)]).content print(f润色完成内容如下\n{polished}) return {polished_draft: polished} # 第四步生成最终稿 PROMPT_4 ( 根据润色版文章生成文章终稿。\n 主题{topic}\n 大纲{outline}\n 润色版文章{polished_draft}\n ) def finalize_content(state: OverallState): 生成最终版本的内容 prompt PROMPT_4.format(topicstate[topic], outlinestate[outline], polished_draftstate[polished_draft]) final_content model.invoke([HumanMessage(contentprompt)]).content return {final_content: final_content} # 构建工作流 builder StateGraph(OverallState, input_schemaInputState, output_schemaOutputState) # 添加节点 builder.add_node(generate_outline) # 生成大纲 builder.add_node(generate_draft) # 生成初稿 builder.add_node(polish_content) # 润色稿件 builder.add_node(finalize_content) # 生成最终稿 # 连接节点直线流程 builder.add_edge(START, generate_outline) # 开始 → 生成大纲 builder.add_edge(generate_outline, generate_draft) # 大纲 → 生成初稿 builder.add_edge(generate_draft, polish_content) # 初稿 → 润色 builder.add_edge(polish_content, finalize_content) # 润色 → 最终稿 builder.add_edge(finalize_content, END) # 最终稿 → 结束 # 编译工作流 chain builder.compile() # 使用工作流 result chain.invoke({topic: 人工智能的未来发展}) print(最终创作结果:) print(result[final_content])2. 并行化模式 (Parallelization)2.1 概念并行化就是多个任务同时进行提高效率最终汇总结果。多角度处理同一问题时常用。比如要研发一款智能电动自行车需要做多维度分析市场分析用户关注续航、重量、防盗对骑行社交有兴趣竞品分析传统品牌智能化不足互联网品牌续航和售后差技术分析轻量化电池、GPS防盗、社交App集成最终汇总分析结果并行不仅省时还能提升决策质量。2.2 模式实践from typing import TypedDict from langgraph.constants import START, END from langgraph.graph import StateGraph class AnalysisState(TypedDict): concept: str # 产品概念 market: str # 市场分析结果 competitor: str # 竞品分析结果 tech: str # 技术分析结果 report: str # 汇总报告 # 三个并行分析任务 def market_task(state: AnalysisState): 市场分析节点 return {market: 用户关注续航、重量、防盗对骑行社交有兴趣...} def competitor_task(state: AnalysisState): 竞品分析节点 return {competitor: 传统品牌智能化不足互联网品牌续航和售后差...} def tech_task(state: AnalysisState): 技术分析节点 return {tech: 轻量化电池车身、GPS防盗、社交App集成...} # 汇总结果节点 def combine_results(state: AnalysisState): 把三个分析结果合并成一份报告 report 产品分析报告\n\n report f市场分析\n{state[market]}\n\n report f竞品分析\n{state[competitor]}\n\n report f技术分析\n{state[tech]}\n\n report 建议聚焦续航、防盗、社交功能的平衡发展 return {report: report} # 构建工作流 builder StateGraph(AnalysisState) builder.add_node(market, market_task) builder.add_node(competitor, competitor_task) builder.add_node(tech, tech_task) builder.add_node(combine, combine_results) # 并行执行三个分析从START同时连到三个节点 builder.add_edge(START, market) builder.add_edge(START, competitor) builder.add_edge(START, tech) # 三个分析都完成后汇总 builder.add_edge(market, combine) builder.add_edge(competitor, combine) builder.add_edge(tech, combine) builder.add_edge(combine, END) workflow builder.compile() # 使用 result workflow.invoke({concept: 城市通勤智能电动自行车}) print(result[report])3. 路由模式 (Routing)3.1 概念路由模式也叫智能分流根据输入内容决定走哪个分支。最典型的就是智能客服系统自动把用户问题分类处理。3.2 模式实践实现一个智能客服系统根据用户问题自动分类达到精准匹配处理能力。核心设计在于条件路由机 制• 动态路径选择可以基于 LLM 分析结果动态决定执行路径结构化返回• 分支隔离不同类型的问题由专用处理器处理核心设计在于条件路由机制动态路径选择 分支隔离。节点设计为• model_call_router 节点: 路由决策节点根据用户问题由 LLM 通过结构化返回进行智能决 策。• pre_sale_handler 节点: 处理售前咨询• after_sale_handler 节点: 处理售后问题• technical_handler 节点: 处理技术问题from langchain.chat_models import init_chat_model from langgraph.constants import START, END from langgraph.graph import StateGraph from typing_extensions import Literal, TypedDict from pydantic import BaseModel, Field class State(TypedDict): input: str # 用户输入的问题 decision: str # 路由决策结果 output: str # 最终输出 # 用 Pydantic 定义路由决策的数据结构LLM 会按这个结构返回 class Route(BaseModel): step: Literal[pre_sale, after_sale, technical] Field( description根据用户问题类型决定路由到售前、售后还是技术处理 ) # 路由决策节点 - 调用LLM分析用户输入决定问题类型 def model_call_router(state: State): 分析用户输入决定问题类型 model init_chat_model(gpt-4o-mini) decision model.with_structured_output(Route).invoke(state[input]) return {decision: decision.step} # 三个不同的处理节点 def pre_sale_handler(state: State): 处理售前咨询 return {output: 售前咨询已处理处理内容......} def after_sale_handler(state: State): 处理售后问题 return {output: 售后问题已处理处理内容......} def technical_handler(state: State): 处理技术问题 return {output: 技术问题已处理处理内容......} # 路由函数 - 根据决策结果返回下一个要执行的节点名 def route_decision(state: State): if state[decision] pre_sale: return pre_sale_handler elif state[decision] after_sale: return after_sale_handler elif state[decision] technical: return technical_handler # 构建路由工作流 router_builder StateGraph(State) # 添加所有节点 router_builder.add_node(pre_sale_handler) router_builder.add_node(after_sale_handler) router_builder.add_node(technical_handler) router_builder.add_node(model_call_router) # 先经过路由决策 router_builder.add_edge(START, model_call_router) # 条件边根据路由结果选择不同的分支 router_builder.add_conditional_edges( model_call_router, route_decision, [pre_sale_handler, after_sale_handler, technical_handler] ) # 所有分支最终都结束 router_builder.add_edge(pre_sale_handler, END) router_builder.add_edge(after_sale_handler, END) router_builder.add_edge(technical_handler, END) router_workflow router_builder.compile() # 测试用例 test_cases [ 我想了解一下你们产品的价格和功能, # 售前咨询 我购买的产品有质量问题需要退货, # 售后问题 这个软件安装后无法正常运行报错代码0x80070005, # 技术问题 请问你们的售后服务政策是什么, # 售前咨询 我的订单已经发货但还没收到, # 售后问题 如何配置数据库连接参数 # 技术问题 ] for test_case in test_cases: print(- * 50) result router_workflow.invoke({input: test_case}) print(f用户问题{test_case}\n{result[output]})4. 协调者-工作者模式 (Orchestrator-Workers)4.1 概念协调者-工作者模式就是一个大脑协调者分配任务多个工人工作者执行最后合成最终结果。比如处理几百页技术文档协调者拆分文档安排多个工作者并行处理不同章节最终汇总。和并行化模式的核心区别在于任务分配方式并行化任务在设计时就确定所有任务同时开始协调者-工作者任务在运行时由协调者动态分配举个例子文档翻译并行化是已知要翻译3个固定章节同时翻译协调者-工作者是先分析文档结构发现需要翻译5个章节再动态分配数据分析并行化是同时计算平均值、最大值、最小值固定指标协调者-工作者是先分析数据特征决定需要计算哪些统计指标4.2 模式实践我们要实现协调者根据topic生成报告大纲再把子任务指派给工作者工作者生成大纲对应的内容协调者生成3个标题就需要3个工作者10个就需要10个合成器汇总所有工作者的成果关键是任务指派协调者在运行时动态分配工作者边的数量在运行时才能确定。LangGraph 支持从条件边返回Send对象来实现这个。Send有两个参数第一个是节点名称第二个是传递给该节点的状态。from langchain.chat_models import init_chat_model from langgraph.constants import START, END from langgraph.graph import StateGraph from langgraph.types import Send # 用于动态分配任务给工作者 from typing import Annotated, TypedDict, List import operator from pydantic import BaseModel class State(TypedDict): topic: str sections: list # 协调者生成的计划 completed_sections: Annotated[list, operator.add] # 工作者完成的结果用 operator.add 自动合并 final_report: str # 定义数据结构 - 结构化输出 class Section(BaseModel): name: str description: str class Sections(BaseModel): sections: List[Section] # 创建规划器 - 用结构化输出让LLM返回固定格式 model init_chat_model(gpt-4o-mini) planner model.with_structured_output(Sections) # 协调者节点 - 分析任务并制定执行计划 def orchestrator(state: State): 协调者制定报告大纲 report_sections planner.invoke( f为主题{state[topic]}制定报告大纲包含3个章节 ) return {sections: report_sections.sections} # 工作者节点 - 根据分配的任务生成具体内容 def llm_call(state: State): 工作者编写具体的报告章节 section state[section] # 从协调者接收的任务 result model.invoke( f编写报告章节{section.name}内容要求{section.description} ) return {completed_sections: [result.content]} # 返回列表operator.add 会自动合并 # 汇总节点 - 把所有工作者的成果拼接起来 def synthesizer(state: State): 汇总所有工作者的成果 completed_sections state[completed_sections] final_report \n\n---\n\n.join(completed_sections) return {final_report: final_report} # 任务分配函数 - 这是关键部分 # 返回一个 Send 列表每个 Send 会动态创建一个工作者任务 def assign_workers(state: State): 为每个章节创建一个工作者任务 worker_tasks [] for section in state[sections]: worker_tasks.append( Send(llm_call, {section: section}) # Send(节点名, 传给节点的状态) ) return worker_tasks # 构建工作流 builder StateGraph(State) builder.add_node(orchestrator, orchestrator) builder.add_node(llm_call, llm_call) builder.add_node(synthesizer, synthesizer) builder.add_edge(START, orchestrator) # 关键协调者后用条件边动态创建多个工作者 builder.add_conditional_edges( orchestrator, assign_workers, [llm_call] # 所有动态创建的工作者都指向 llm_call 节点 ) # 所有工作者完成后汇总 builder.add_edge(llm_call, synthesizer) builder.add_edge(synthesizer, END) worker builder.compile() response worker.invoke({topic: 中国近代史}) print(response)运行结果{ topic: 中国近代史, sections: [ Section(name第一章近代史概述, description介绍中国近代史的背景与重要性涵盖社会、经济、文化等多个方面的变化。), Section(name第二章鸦片战争与列强侵华, description探讨鸦片战争对中国的影响以及随后的列强侵华与民族觉醒进程。), Section(name第三章辛亥革命与新文化运动, description分析辛亥革命及其后果以及新文化运动对中国现代化的推动作用。) ], completed_sections: [# 第一章..., # 第二章..., # 第三章...], final_report: # 第一章...# 第二章...# 第三章... }5. 评估器-优化器模式 (Evaluator-optimizer)5.1 概念评估器-优化器模式的流程是先生成内容 → 再评估质量 → 如果需要改进就重新生成不断循环直到满足质量标准。经典场景内容质量优化、代码生成和改进。5.2 模式实践核心在于质量检测。这个模式在前面案例3智能文档问答系统中已经实现过了这里简单回顾下核心逻辑from langchain.chat_models import init_chat_model from langchain_core.messages import HumanMessage, AIMessage from typing_extensions import TypedDict, Annotated from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_tavily import TavilySearch # 定义消息状态 class MessagesState(TypedDict): messages: Annotated[list, add_messages] # 准备工具和模型 search TavilySearch(max_results4) tools [search] model init_chat_model(gpt-4o-mini, temperature0) model_with_tools model.bind_tools(tools) # 评估检索结果质量的节点 def grade_documents(state: MessagesState): 评估检索到的文档质量决定是直接生成答案还是重写问题 messages state[messages] last_message messages[-1] # 这里简化了评估逻辑实际应该用LLM来评估文档相关性 if hasattr(last_message, content) and len(last_message.content) 10: score yes # 文档质量ok else: score no # 文档质量不够需要重写问题 if score yes: return generate_answer # 质量达标去生成答案 else: return rewrite_question # 质量不够去优化问题重新检索值得注意的是除了可以让LLM当作评估器LangGraph还支持人机交互式的评估即将质量评估由人工来完成。