Python asyncio 模块学习总结:从“等着”到“切出去干点别的” Python asyncio 模块学习记录从“等着”到“切出去干点别的”最近在补 Python 的异步编程绕不开asyncio。一开始我对它的理解挺模糊async、await、协程、事件循环、Task这些词看起来都认识但放在一起就有点飘。这篇文章算是一份学习记录。我主要理清asyncio主要的几个问题它解决什么问题核心概念是什么代码应该怎么写以及有哪些容易踩坑的地方。1. asyncio 解决的是什么问题先说结论asyncio主要适合I/O 密集型任务。比如网络请求数据库查询文件读写WebSocket 通信定时任务爬虫聊天服务高并发接口调用这些任务的共同点是程序经常不是在“计算”而是在“等待”。举个很普通的例子请求 3 个接口每个接口都要等 2 秒。同步写法大概是这样importtimedeffetch(name):print(f开始请求{name})time.sleep(2)print(f完成请求{name})fetch(A)fetch(B)fetch(C)总耗时大约 6 秒因为它是一个接一个等。但异步的想法是既然 A 在等网络返回那我为什么不先去处理 BB 也在等那我再去处理 C。也就是说asyncio并不是让 Python 同时做很多计算而是让程序在等待的时候别傻站着。2. 一个简单的异步例子先看代码importasyncioasyncdeffetch(name):print(f开始请求{name})awaitasyncio.sleep(2)print(f完成请求{name})asyncdefmain():awaitasyncio.gather(fetch(A),fetch(B),fetch(C),)asyncio.run(main())这段代码总耗时大约 2 秒而不是 6 秒。注意这里有几个关键词async def定义一个协程函数await等待一个异步操作完成asyncio.sleep()异步版本的 sleepasyncio.gather()并发运行多个协程asyncio.run()启动事件循环运行入口协程这就是asyncio最基础的味道。3. 协程是什么普通函数调用后会立刻执行defhello():print(hello)hello()但协程函数不一样asyncdefhello():print(hello)hello()你会发现直接调用hello()并不会真正执行函数体它只是返回了一个协程对象。真正执行它需要awaithello()或者在最外层asyncio.run(hello())可以先粗略理解成协程是一种可以暂停和恢复的函数。当协程执行到await时它会把控制权交还给事件循环。事件循环发现这个任务暂时在等就会去调度别的任务。4. 事件循环asyncio 的调度中心事件循环可以理解成asyncio的“管家”。它不断检查哪些任务可以运行哪些任务正在等待哪些任务已经完成等待完成后该恢复哪个任务流程大概是这样5. await 到底在等什么await后面通常接的是一个“可等待对象”常见有协程对象TaskFuture一些异步库返回的对象比如awaitasyncio.sleep(1)这里不是让线程阻塞 1 秒而是告诉事件循环当前协程先暂停 1 秒这段时间你可以去执行别的任务。这点很关键。如果在异步函数里写time.sleep(1)那就糟了。它会阻塞整个线程事件循环也动不了其他协程也没法执行。正确写法是awaitasyncio.sleep(1)6. create_task把协程变成任务看一个容易误解的例子asyncdefwork(name):print(f{name}start)awaitasyncio.sleep(1)print(f{name}end)asyncdefmain():awaitwork(A)awaitwork(B)这段代码虽然用了async但仍然是顺序执行。因为它先等 A 完成再等 B 完成。如果想让 A 和 B 同时开始需要创建任务asyncdefmain():task_aasyncio.create_task(work(A))task_basyncio.create_task(work(B))awaittask_aawaittask_b也可以写成asyncdefmain():awaitasyncio.gather(work(A),work(B),)我的理解是协程像“任务说明书”Task 像“已经交给事件循环执行的任务”只定义协程不代表它已经被调度执行。7. gather等一组任务全部完成asyncio.gather()很适合批量并发。importasyncioasyncdefdownload(index):print(f开始下载文件{index})awaitasyncio.sleep(1)print(f下载完成文件{index})returnffile-{index}asyncdefmain():resultsawaitasyncio.gather(download(1),download(2),download(3),)print(results)asyncio.run(main())输出类似开始下载文件 1 开始下载文件 2 开始下载文件 3 下载完成文件 1 下载完成文件 2 下载完成文件 3 [file-1, file-2, file-3]gather()的特点是它会等所有任务完成并按照传入顺序返回结果。8. 一个更贴近实际的例子批量处理请求这里不用真实网络请求先用asyncio.sleep()模拟接口耗时。importasyncioimportrandomasyncdefrequest_api(user_id):delayrandom.uniform(0.5,2)print(f请求用户{user_id}预计耗时{delay:.2f}s)awaitasyncio.sleep(delay)return{user_id:user_id,status:ok,}asyncdefmain():user_ids[101,102,103,104,105]tasks[request_api(user_id)foruser_idinuser_ids]resultsawaitasyncio.gather(*tasks)foriteminresults:print(item)asyncio.run(main())这类场景在真实项目里很常见比如批量查用户信息、批量调用第三方接口、批量拉取远程资源。不过这里也引出一个问题如果一次性创建几千个任务会不会把服务打爆答案是会有风险。这时候就需要限制并发数量避免造成较大的访问量服务器宕机。9. 用 Semaphore 控制并发数量Semaphore可以控制同一时间最多有多少个任务在执行某段逻辑。importasyncioimportrandomasyncdefrequest_api(user_id,sem):asyncwithsem:delayrandom.uniform(0.5,2)print(f开始请求用户{user_id})awaitasyncio.sleep(delay)print(f完成请求用户{user_id})returnuser_idasyncdefmain():semasyncio.Semaphore(3)tasks[request_api(user_id,sem)foruser_idinrange(1,11)]resultsawaitasyncio.gather(*tasks)print(results)asyncio.run(main())这里Semaphore(3)表示最多同时执行 3 个请求。这个写法很实用。写爬虫、批量请求接口、批量处理消息时经常能用上。10 个任务Semaphore 限制同时最多运行 3 个任务完成后释放名额后续任务继续进入10. 超时处理asyncio.wait_for异步任务里超时控制很重要。比如请求第三方接口不可能无限等下去。importasyncioasyncdefslow_request():awaitasyncio.sleep(5)returndoneasyncdefmain():try:resultawaitasyncio.wait_for(slow_request(),timeout2)print(result)exceptasyncio.TimeoutError:print(请求超时)asyncio.run(main())这里slow_request()要 5 秒但wait_for()只等 2 秒所以会抛出TimeoutError。这类代码在实际项目里很常见因为异步程序如果缺少超时可能会堆积大量迟迟不结束的任务。11. 取消任务task.cancel()任务被创建后也可以取消。importasyncioasyncdefworker():try:whileTrue:print(working...)awaitasyncio.sleep(1)exceptasyncio.CancelledError:print(任务被取消开始清理资源)raiseasyncdefmain():taskasyncio.create_task(worker())awaitasyncio.sleep(3)task.cancel()try:awaittaskexceptasyncio.CancelledError:print(main 捕获取消结果)asyncio.run(main())这里要注意CancelledError不只是一个普通错误它通常意味着任务生命周期要结束了。如果捕获它一般清理完资源后要继续raise不要悄悄吞掉。12. Queue生产者和消费者模型asyncio.Queue很适合处理“一个地方生产任务另一个地方消费任务”的场景。importasyncioimportrandomasyncdefproducer(queue):foriinrange(1,6):awaitasyncio.sleep(0.5)itemftask-{i}awaitqueue.put(item)print(f生产{item})asyncdefconsumer(queue,name):whileTrue:itemawaitqueue.get()try:print(f{name}处理{item})awaitasyncio.sleep(random.uniform(0.5,1.5))finally:queue.task_done()asyncdefmain():queueasyncio.Queue()producers[asyncio.create_task(producer(queue))]consumers[asyncio.create_task(consumer(queue,worker-A)),asyncio.create_task(consumer(queue,worker-B)),]awaitasyncio.gather(*producers)awaitqueue.join()forcinconsumers:c.cancel()asyncio.run(main())这个例子很像后台任务系统producer 负责放入任务consumer 负责处理任务queue 负责缓冲任务13. asyncio 适合什么不适合什么适合高并发网络请求Web 服务WebSocket爬虫消息队列消费者定时任务调度I/O 密集型后台任务不太适合大量 CPU 计算图像处理视频编码大规模数学运算需要真正并行计算的场景因为asyncio的并发主要发生在等待 I/O 的时候。如果任务本身一直占着 CPU 算东西它不会主动让出控制权事件循环也没机会调度别的协程。CPU 密集型任务通常更适合multiprocessingconcurrent.futures.ProcessPoolExecutorC 扩展NumPy 这类释放 GIL 的计算库14. 几个常见坑坑 1async 函数调用后没有 awaitasyncdefhello():print(hello)hello()这样不会真正执行。要写awaithello()或者asyncio.run(hello())坑 2在 async 函数里用了 time.sleepasyncdefbad():time.sleep(1)这会阻塞事件循环。应该写asyncdefgood():awaitasyncio.sleep(1)坑 3以为 async 就一定更快如果任务是 CPU 密集型asyncio不一定更快甚至可能更绕。坑 4忘了处理异常taskasyncio.create_task(do_something())如果创建了任务但后面不 await它里面的异常可能不容易被及时发现。更稳妥的方式是保存 task并在合适的地方等待或统一管理。坑 5在已经运行的事件循环里调用 asyncio.runasyncio.run()通常只应该作为程序入口调用一次。在 Jupyter、某些 Web 框架或异步环境里事件循环可能已经存在这时再调用asyncio.run()就容易报错。15. 总结我对asyncio的理解大概是它不是让 Python 魔法般同时执行很多代码而是提供了一套协作式调度机制让程序在等待 I/O 的时候主动让出执行权。也就是说asyncio的核心不是“快”而是“别浪费等待时间”。它的几个关键词可以串起来理解async def创建协程create_task交给事件循环调度遇到 await 暂停执行其他任务I/O 完成后恢复asyncio刚开始看起来概念很多但抓住几个点会清晰很多async def定义协程函数调用协程函数只会得到协程对象不会立即执行await会暂停当前协程把控制权交给事件循环create_task()会把协程包装成任务并调度执行gather()可以等待多个任务完成Semaphore可以限制并发数量Queue适合生产者消费者模型不要在异步代码里写阻塞操作比如time.sleep()目前我觉得学习asyncio最好的方式不是一上来背概念而是多写几个小例子模拟请求、限制并发、处理超时、取消任务、队列消费。写着写着事件循环和协程之间的关系就会慢慢变得具体通过几个例子的编写练习尝试去融会贯通。