快速理解python的agent框架中异步async语法 零、前言本人仅有java经验但最近在接触agent学习python的时候看到了频繁出现的async语法。在此把我的理解记录一下。一、协程和事件循环async修饰的函数在运行的时候是作为一个“协程”去运行的多个协程是以“时分复用”的方式运行在同一个线程中的这点和JavaScript一致但我也不太了解js。具体来说python就是会启动一个像while一样的事件循环event loop。这个事件循环只是运行在一个线程上但是我们却可以提交多个async修饰的协程上去。如果发现正在运行的协程主动挂起了那么事件循环就会把其他已经注册上去 但是还没运行的协程运行起来。我们可以使用以下这些方法来同时运行多个协程results await asyncio.gather(*tasks) 或者 task1 asyncio.create_task(fetch(url1)) task2 asyncio.create_task(fetch(url2)) 或者 for finished_task in asyncio.as_completed(tasks): result await finished_task # 这里我看的时候有个疑问为什么as_completed返回的是已经就绪的了下面还要再await一下呢 # 事实上异步函数必须使用await接收才可以。二、async def 和 async with1. async def我们将使用asyncio.gather这是 Python 运行多个异步函数最常用的方式。import asyncio import time async def fetch_db_data(db_name, delay): print(f [开始] 正在请求 {db_name}...) # 模拟网络 IO 等待 await asyncio.sleep(delay) print(f [完成] {db_name} 返回了结果 (耗时 {delay}s)) return f{db_name}_data async def main(): start_time time.perf_counter() print( 程序启动) # 同时启动两个异步任务 # 注意这里只是创建了任务并没有立即阻塞 task1 fetch_db_data(数据库A, 2) task2 fetch_db_data(数据库B, 1) print( 已向两个数据库发送请求准备进行 await 挂起...) # 使用 gather 并发运行 results await asyncio.gather(task1, task2) end_time time.perf_counter() print(f 最终结果: {results}) print(f 总共耗时: {end_time - start_time:.2f} 秒) # 启动事件循环 asyncio.run(main())我们把**事件循环Event Loop**想象成一个“超级调度员”把main和fetch_db_data想象成“工人”。启动阶段asyncio.run(main())启动了调度员。main工人开始执行打印“程序启动”。任务创建task1和task2被创建。此时调度员知道有两个活要干但还没正式开始“等”。遇到await asyncio.gather(...)交出控制权main工人执行到这一行发现需要等待task1和task2完成。于是main主动挂起把控制权还给调度员。调度员介入调度员看到task1说它要await asyncio.sleep(2)。谁在等调度员通知操作系统OS内核“开启两个计时器一个 2 秒一个 1 秒。你内核帮我盯着好了叫我。”不卡住此时 Python 进程其实是“闲”的。如果有第三个任务比如用户点击了取消按钮调度员现在就能去处理它。第 1 秒内核信号操作系统内核发现 1 秒的计时器到期了给调度员发了个信号。任务恢复调度员唤醒fetch_db_data(数据库B)它打印出“[完成] 数据库B...”。第 2 秒同样的流程数据库A也完成了。最后合并当gather里的所有任务都完成了调度员踢了main工人一脚“喂你要的东西都齐了继续吧”main从await那一行苏醒拿到results列表打印出总耗时。为什么总耗时是 2 秒而不是 3 秒因为在第 0 秒到第 1 秒这段时间里数据库A和数据库B同时在被内核计时模拟网络传输。同步代码必须等 A 的 2 秒走完才开始走 B 的 1 秒。异步代码它们的等待时间是重叠的。2. async with在 Python 中async with的核心逻辑是“我要异步地申请打开一个资源并在用完后异步地把它关掉。”假设我们有两个 MCP 服务器工具一个叫Database一个叫FileStorage。我们需要“同时”打开它们并读取数据。import asyncio class AsyncSafe: 模拟一个异步资源如数据库连接或文件句柄 def __init__(self, name, open_delay): self.name name self.open_delay open_delay async def __aenter__(self): # 步骤 1异步申请打开资源 print(f [申请] 正在尝试打开 {self.name}...) await asyncio.sleep(self.open_delay) print(f [开启] {self.name} 已安全打开 (耗时 {self.open_delay}s)) return f{self.name} 的内容 async def __aexit__(self, exc_type, exc, tb): # 步骤 3异步清理/关闭资源 print(f [关闭] 正在异步锁上 {self.name}...) await asyncio.sleep(0.5) print(f [完毕] {self.name} 已彻底关闭。) async def access_resource(name, delay): # 使用 async with 确保资源被正确管理 async with AsyncSafe(name, delay) as data: # 步骤 2使用资源 print(f [使用] 正在处理 {data}) await asyncio.sleep(1) # 模拟处理数据 return f处理完的{data} async def main(): print( 任务开始准备并发访问两个保险箱) # 同时启动两个访问任务 task1 access_resource(保险箱A, 2) task2 access_resource(保险箱B, 1) results await asyncio.gather(task1, task2) print(f 最终汇总结果: {results}) asyncio.run(main())当程序运行到async with时发生的“接力”比普通异步函数多了一步第一步进入阶段 (__aenter__)程序运行到async with AsyncSafe(...)。它不会卡死等待保险箱打开而是调用await __aenter__()。交出控制权“保险箱A”的任务告诉调度员“我要等 2 秒才能开锁你先去忙别的。”调度员转身立即去处理“保险箱B”的任务。第二步代码块执行由于“保险箱B”只需要 1 秒就能打开它会先苏醒。调度员把控制权还给“保险箱B”的任务进入as data后面的缩进代码块。这时“保险箱A”还在后台由内核盯着计时。第三步退出阶段 (__aexit__)当“保险箱B”的代码块执行完毕程序会自动触发await __aexit__()。再次交出控制权关闭保险箱也需要时间比如断开网络、写入日志它再次挂起让调度员去看看“保险箱A”开好了没。为什么一定要用async with作为程序员你可能会问我直接用await open()和await close()不行吗原因在于“异常处理”如果你的代码在[使用]阶段步骤 2崩溃了比如数据解析报错普通的await close()可能永远不会被执行导致数据库连接泄露。 而async with保证了即便内部报错调度员也会确保__aexit__被执行优雅地帮你把门关上。总结交接棒模型main把棒子交给gather。gather同时把两根棒子交给A和B。A运行到async with的__aenter__发现要等把棒子还给调度员。B同理。内核发现 B 的时间到了把棒子还给B继续跑完代码块和__aexit__。最后所有人都跑完了棒子回到main手里。拓展知识与 FastAPI 的依赖注入结合在开发 Web API 或 MCP 服务器时你经常会看到这种模式。比如在 FastAPI 中app.get(/) async def read_items(): async with httpx.AsyncClient() as client: # 这个 client 在这行请求发出去时会交出控制权 # 从而让服务器在等待响应时还能处理其他用户的请求 resp await client.get(https://example.com) return resp.json()这种写法已经成为现代 Python 高性能开发的标准模板。三、线程池作为java程序员在遇到耗时操作的时候使用线程池去进行异步操作才是合理的理解。python的异步当中也可以加入线程池例如async def execute_tool_async(self, tool_name: str, input_data: str) - str: 异步执行单个工具 loop asyncio.get_event_loop() def _execute(): return self.registry.execute_tool(tool_name, input_data) result await loop.run_in_executor(self.executor, _execute) return result可以认为 _execute 是一个耗时的动作self.executor是一个线程池loop是执行当前这个异步操作的事件循环。我们把这个耗时动作交给线程池执行同时把整个操作包装为一个异步的协程操作等待result返回。四、理解按我理解java当中使用异步是把耗时操作委托给别的线程完成而python当中的异步更像是让多个有可能需要等待一会儿才能返回、但又不会自己完全阻塞住当前资源让其空转的一些小任务在同一个线程上一块执行。这属于是两种异步的思路吧抢占式的线程级别运行情况由os决定受时间片轮转的影响。例如即使有一个线程一直while true也不会影响其他同样正在运行的线程。因此java的并发属于是真多核的并发。协作式的协程级别运行情况由用户控制遇到await的时候才会挂起自己。如果一个协程一直while true那么其他任务没办法运行。因此python的这种更像是时分复用。五、其他与之相关但有待进一步学习的知识python的GIL机制、Java的虚拟线程机制。