核心原理LangChain所有链都集成Chain基类必须实现的两个核心方法_call同步执行逻辑_acall:异步执行逻辑FastAPI项目必用核心属性input_variables:定义输入参数名output_variables:定义输出结果手写自定义业务链class TestCaseGenerateChain(Chain): 自定义专属链业务需求 → 自动生成标准测试用例 # 定义输入变量 必须传入一个叫 business_require 的参数 input_variables: list[str] [business_require] # 定义输出变量 output_variables: list[str] [test_case_result] property #只读属性 不加括号就能调用 def _chain_type(self) - str: # 链标识名称 給自定义链去了唯一标识名称 return test_case_generate_chain def _call( self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] None ) - Dict[str, Any]: 同步执行逻辑 # 取出输入参数 require_content inputs[business_require] # 自定义业务提示词 prompt f 你是资深测试工程师根据业务需求生成标准测试用例 格式用例标题 | 前置条件 | 操作步骤 | 预期结果 业务需求{require_content} .strip() # 调用大模型 response llm.invoke(prompt) return {test_case_result: response.content} async def _acall( self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] None ) - Dict[str, Any]: 异步执行逻辑适配FastAPI异步接口 require_content inputs[business_require] prompt f 你是资深测试工程师根据业务需求生成标准测试用例 格式用例标题 | 前置条件 | 操作步骤 | 预期结果 业务需求{require_content} .strip() response await llm.ainvoke(prompt) return {test_case_result: response.content} #实例化自定义链 case_chain TestCaseGeneratrChain() #同步调用 if __name__ __main__: res case_chain.invoke({ business_reqiore:实现手机号验证码登录功能 }) print(生成测试用例\n, res[test_case_result])实战 2多字段输入复合型自定义链class DocAnalysisChain(Chain): #文档分析链文档内容业务场景 - 摘要核心要点双输出 input_variables [doc_content,scene] output_variables [doc_summary,core_points] property def _chain_type(self): return doc_analysis_chain def _call(self, inputs:Dict,run_managerNone): content inputs[doc_content] scene inputs[scene] prompt1 f基于{scene}场景精简摘要{content} summary llm.invoke(prompt1).content prompt2 f从内容提取核心业务要点{content} points llm.invoke(prompt2).content return { doc_summary: summary, core_points: points, } analyze_chain DocAnalysisChain() result analyze_chain.invoke({ doc_content:登录模块包含账号密码登录、异常拦截校验, scene:接口测试 }) print(摘要,result[doc_summary]) print(要点,result[core_points])进阶用法 自定义链 之前自定义检索器组合#先检索知识库内容 retriever HybridRerankRetriever(collection_namelangchain_rag_kb,all_docs[]) docs retriever._get_relevant_documents(登录异常场景) context_text \n.join([d.page_content for d in docs]) #传入自定义测试用例链 chain TestCaseGenerateChain() final_res chain.invoke({business_require:context_text})核心知识1、所有自定义链必须集成Chain基类2、固定声明input_variables、output_variables规范入参出参3、_call同步、_acall异步双实现后端项目必须写异步4、抛弃固定模板可自由编写提示词、组合业务逻辑5、可自由搭配检索器、工具、记忆组件灵活组装业务流程6、自定义链可直接接入SequentialChain顺序工作流串联执行LangChain 回调函数进阶流式回调 日志追踪 异常监听1、回调核心概念LangChain执行任何组件LLM/Chain/Retriever/Tools都会触发生命周期事件开启执行结束执行llm发起请求逐快返回内容检索开始检索结束任务失败抛出异常BaseCallbackHandler就是用来监听这些事件自定义业务行为。内置常用的回调事件on_chain_start 链开始执行on_chain_end 链执行结束on_chain_error 链执行异常on_llm_start 调用大模型开始on_llm_new_token 大模型逐字返回Token流式核心on_llm_end 大模型调用结束on_retriever_start 检索开始 on_retriever_end 检索结束手写通用自定义回调类class FullChainCallBackHandler(BaseCallbackHandler): #全功能自定义回调日志耗时流式异常 def __init__(self): self.start_time 0 self.stream_content #对外暴露流式字符缓存 self.chunk_queue [] #链的声明周期 def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs) - None: self.start_time time.time() logger.info(f【链开始执行】类型{serialized.get(name)}入参{list(inputs.keys())}) def on_chain_end(self,output:Dict[str,Any],** kwargs): cost round(time.time() - self.start_time,2) logger.info(f【链执行完成】耗时{cost}s输出字段{list(outputs.keys())}) def on_chain_error(self,error:BaseException,**kwargs): logger.error(f【链执行异常】错误信息{str(error)}) #llm大模型回调流式核心 def on_llm_start(self,serialized: dict[str, Any],prompts: list[str], **kwargs): logger.info(【发起大模型请求】) self.stream_content self.chunk_queue.clear() def on_llm_new_token(self,token:str, **kwargs): #逐字接受流式token self.stream_content token self.chunk_queue.append(token) #控制台实时打印流式内容 print((token, end, flushTrue) def on_llm_end(self,response:LLMResult,**kwargs): logger.info(【大模型返回完成】完整内容长度{len(self.stream_content)} #检索器回调 def on_retriever_start(self,query:str, **kwargs): logger.info(f知识库开始查询问题{query}) def on_retriever_end(self,documents:List[Document],**kwargs): logging.info(f【检索完成】召回文档数量:{len(documents)})绑定回调到自定义 Chain / RAG 链1. 普通调用绑定回调def on_retriever_end(self,documents:List[Document],**kwargs): logging.info(f【检索完成】召回文档数量:{len(documents)}) from langchain_core.callbacks import CallbackManager #实例化自定义回调 my_callback FullChainCallBackHandler() #回调管理器 用来装载一大堆回调函数的管理员 cb_manager CallbackManager([my_callback]) #实例化自定义链注入回调 case_chain TestCaseGenerateChain(callback_managercb_manager) if __name__ __main__: print(\n 开始执行自定义业务链 ) res case_chain.invoke({ business_require: 用户忘记密码找回功能 }) print(\n最终结果, res[test_case_result])2. 绑定到 RAG 检索问答链llm get_rag_llm() retriever HybridRerankRetriever(collection_namelangchain_rag_kb,all_docs[]) rag_cb FullChainCallBackHandler() rag_manager CallbackManager([rag_cb]) rag_chain RetrievalQA.from_chain_type( llmllm, retrieverretriever, retriever_managerrag_manager, ) rag_chain.invoke({query:登录异常有哪些情况})FastAPI 对接回调实现 SSE 流式把回调里的chunk_queue取出结合生成器推送前端抛弃旧流式写法统一用回调管控def sse_stream_generator(query:str): cb FullChainCallBackHandler() cb_mgr CallbackManager([cb]) rag_chain RetrievalQA.from_chain_type(llmget_rag_llm(),retrieverretriever,callbackscb_mgr) #异步后台执行 import threading threading.Thread(targetrag_chain.invoke, args({query:query},)).start() #循环去出流式片段推送 import time while True: if cb.chunk_queue: yield fdata: {char}\n\n time.sleep(0.02) if not cb.chunk_queue and len(cb.stream_content) 0 : break yield data: {DONE}\n\n优势全链路统一监听流式日志耗时统计一套代码搞定六、企业级实际用途性能监控统计每一条问答、每一条接口 LLM 调用耗时日志审计记录用户提问、模型输出方便回溯问题异常兜底模型超时、接口报错、检索失败统一捕获流式统一管控所有 AI 输出流式全部走回调不用多处写流式逻辑Token 统计扩展可在回调内对接接口统计输入输出 token核算成本今日核心总结1、所有 LangChain 组件都支持注入CallbackManager绑定自定义回调2、on_llm_new_token是实现流式输出最标准、企业最常用方式3、回调可监听链、大模型、检索器、工具全生命周期4、同步 / 异步 Chain、RAG、Agent 全部通用这套回调体系5、线上项目排查问题、性能优化、流式交互优先用回调实现
LangChain 自定义 Chain 手写实现
发布时间:2026/5/19 15:40:41
核心原理LangChain所有链都集成Chain基类必须实现的两个核心方法_call同步执行逻辑_acall:异步执行逻辑FastAPI项目必用核心属性input_variables:定义输入参数名output_variables:定义输出结果手写自定义业务链class TestCaseGenerateChain(Chain): 自定义专属链业务需求 → 自动生成标准测试用例 # 定义输入变量 必须传入一个叫 business_require 的参数 input_variables: list[str] [business_require] # 定义输出变量 output_variables: list[str] [test_case_result] property #只读属性 不加括号就能调用 def _chain_type(self) - str: # 链标识名称 給自定义链去了唯一标识名称 return test_case_generate_chain def _call( self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] None ) - Dict[str, Any]: 同步执行逻辑 # 取出输入参数 require_content inputs[business_require] # 自定义业务提示词 prompt f 你是资深测试工程师根据业务需求生成标准测试用例 格式用例标题 | 前置条件 | 操作步骤 | 预期结果 业务需求{require_content} .strip() # 调用大模型 response llm.invoke(prompt) return {test_case_result: response.content} async def _acall( self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] None ) - Dict[str, Any]: 异步执行逻辑适配FastAPI异步接口 require_content inputs[business_require] prompt f 你是资深测试工程师根据业务需求生成标准测试用例 格式用例标题 | 前置条件 | 操作步骤 | 预期结果 业务需求{require_content} .strip() response await llm.ainvoke(prompt) return {test_case_result: response.content} #实例化自定义链 case_chain TestCaseGeneratrChain() #同步调用 if __name__ __main__: res case_chain.invoke({ business_reqiore:实现手机号验证码登录功能 }) print(生成测试用例\n, res[test_case_result])实战 2多字段输入复合型自定义链class DocAnalysisChain(Chain): #文档分析链文档内容业务场景 - 摘要核心要点双输出 input_variables [doc_content,scene] output_variables [doc_summary,core_points] property def _chain_type(self): return doc_analysis_chain def _call(self, inputs:Dict,run_managerNone): content inputs[doc_content] scene inputs[scene] prompt1 f基于{scene}场景精简摘要{content} summary llm.invoke(prompt1).content prompt2 f从内容提取核心业务要点{content} points llm.invoke(prompt2).content return { doc_summary: summary, core_points: points, } analyze_chain DocAnalysisChain() result analyze_chain.invoke({ doc_content:登录模块包含账号密码登录、异常拦截校验, scene:接口测试 }) print(摘要,result[doc_summary]) print(要点,result[core_points])进阶用法 自定义链 之前自定义检索器组合#先检索知识库内容 retriever HybridRerankRetriever(collection_namelangchain_rag_kb,all_docs[]) docs retriever._get_relevant_documents(登录异常场景) context_text \n.join([d.page_content for d in docs]) #传入自定义测试用例链 chain TestCaseGenerateChain() final_res chain.invoke({business_require:context_text})核心知识1、所有自定义链必须集成Chain基类2、固定声明input_variables、output_variables规范入参出参3、_call同步、_acall异步双实现后端项目必须写异步4、抛弃固定模板可自由编写提示词、组合业务逻辑5、可自由搭配检索器、工具、记忆组件灵活组装业务流程6、自定义链可直接接入SequentialChain顺序工作流串联执行LangChain 回调函数进阶流式回调 日志追踪 异常监听1、回调核心概念LangChain执行任何组件LLM/Chain/Retriever/Tools都会触发生命周期事件开启执行结束执行llm发起请求逐快返回内容检索开始检索结束任务失败抛出异常BaseCallbackHandler就是用来监听这些事件自定义业务行为。内置常用的回调事件on_chain_start 链开始执行on_chain_end 链执行结束on_chain_error 链执行异常on_llm_start 调用大模型开始on_llm_new_token 大模型逐字返回Token流式核心on_llm_end 大模型调用结束on_retriever_start 检索开始 on_retriever_end 检索结束手写通用自定义回调类class FullChainCallBackHandler(BaseCallbackHandler): #全功能自定义回调日志耗时流式异常 def __init__(self): self.start_time 0 self.stream_content #对外暴露流式字符缓存 self.chunk_queue [] #链的声明周期 def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs) - None: self.start_time time.time() logger.info(f【链开始执行】类型{serialized.get(name)}入参{list(inputs.keys())}) def on_chain_end(self,output:Dict[str,Any],** kwargs): cost round(time.time() - self.start_time,2) logger.info(f【链执行完成】耗时{cost}s输出字段{list(outputs.keys())}) def on_chain_error(self,error:BaseException,**kwargs): logger.error(f【链执行异常】错误信息{str(error)}) #llm大模型回调流式核心 def on_llm_start(self,serialized: dict[str, Any],prompts: list[str], **kwargs): logger.info(【发起大模型请求】) self.stream_content self.chunk_queue.clear() def on_llm_new_token(self,token:str, **kwargs): #逐字接受流式token self.stream_content token self.chunk_queue.append(token) #控制台实时打印流式内容 print((token, end, flushTrue) def on_llm_end(self,response:LLMResult,**kwargs): logger.info(【大模型返回完成】完整内容长度{len(self.stream_content)} #检索器回调 def on_retriever_start(self,query:str, **kwargs): logger.info(f知识库开始查询问题{query}) def on_retriever_end(self,documents:List[Document],**kwargs): logging.info(f【检索完成】召回文档数量:{len(documents)})绑定回调到自定义 Chain / RAG 链1. 普通调用绑定回调def on_retriever_end(self,documents:List[Document],**kwargs): logging.info(f【检索完成】召回文档数量:{len(documents)}) from langchain_core.callbacks import CallbackManager #实例化自定义回调 my_callback FullChainCallBackHandler() #回调管理器 用来装载一大堆回调函数的管理员 cb_manager CallbackManager([my_callback]) #实例化自定义链注入回调 case_chain TestCaseGenerateChain(callback_managercb_manager) if __name__ __main__: print(\n 开始执行自定义业务链 ) res case_chain.invoke({ business_require: 用户忘记密码找回功能 }) print(\n最终结果, res[test_case_result])2. 绑定到 RAG 检索问答链llm get_rag_llm() retriever HybridRerankRetriever(collection_namelangchain_rag_kb,all_docs[]) rag_cb FullChainCallBackHandler() rag_manager CallbackManager([rag_cb]) rag_chain RetrievalQA.from_chain_type( llmllm, retrieverretriever, retriever_managerrag_manager, ) rag_chain.invoke({query:登录异常有哪些情况})FastAPI 对接回调实现 SSE 流式把回调里的chunk_queue取出结合生成器推送前端抛弃旧流式写法统一用回调管控def sse_stream_generator(query:str): cb FullChainCallBackHandler() cb_mgr CallbackManager([cb]) rag_chain RetrievalQA.from_chain_type(llmget_rag_llm(),retrieverretriever,callbackscb_mgr) #异步后台执行 import threading threading.Thread(targetrag_chain.invoke, args({query:query},)).start() #循环去出流式片段推送 import time while True: if cb.chunk_queue: yield fdata: {char}\n\n time.sleep(0.02) if not cb.chunk_queue and len(cb.stream_content) 0 : break yield data: {DONE}\n\n优势全链路统一监听流式日志耗时统计一套代码搞定六、企业级实际用途性能监控统计每一条问答、每一条接口 LLM 调用耗时日志审计记录用户提问、模型输出方便回溯问题异常兜底模型超时、接口报错、检索失败统一捕获流式统一管控所有 AI 输出流式全部走回调不用多处写流式逻辑Token 统计扩展可在回调内对接接口统计输入输出 token核算成本今日核心总结1、所有 LangChain 组件都支持注入CallbackManager绑定自定义回调2、on_llm_new_token是实现流式输出最标准、企业最常用方式3、回调可监听链、大模型、检索器、工具全生命周期4、同步 / 异步 Chain、RAG、Agent 全部通用这套回调体系5、线上项目排查问题、性能优化、流式交互优先用回调实现