Python异步编程高级模式:asyncio事件循环与并发控制前言Python异步编程是现代高性能应用开发的关键技术。asyncio作为Python的内置异步框架,提供了强大的事件循环和并发控制能力。本文将深入探讨Python异步编程的高级模式,帮助你构建高效的并发应用。1. asyncio核心概念1.1 事件循环机制事件循环是asyncio的核心,负责调度和执行协程:# 事件循环基本原理importasyncioasyncdefexample_coroutine():"""示例协程"""print("协程开始")awaitasyncio.sleep(1)# 模拟IO操作print("协程结束")return"结果"# 获取事件循环loop=asyncio.get_event_loop()# 运行协程result=loop.run_until_complete(example_coroutine())print(f"结果:{result}")# 关闭循环loop.close()1.2 协程与任务协程是asyncio的基本执行单元,任务是对协程的封装:importasyncioasyncdeffetch_data(url:str,delay:float)-str:"""模拟数据获取"""print(f"开始获取{url}")awaitasyncio.sleep(delay)# 模拟网络延迟print(f"完成获取{url}")returnf"来自{url}的数据"asyncdefmain():# 创建多个协程coros=[fetch_data("https://api1.example.com",2),fetch_data("https://api2.example.com",1),fetch_data("https://api3.example.com",3),]# 方法1:顺序执行(不推荐)# for coro in coros:# result = await coro# 方法2:并发执行(推荐)tasks=[asyncio.create_task(coro)forcoroincoros]results=awaitasyncio.gather(*tasks)print(f"所有结果:{results}")# 运行asyncio.run(main())2. 高级并发模式2.1 信号量控制并发数importasynciofromtypingimportList,AnyclassConcurrencyLimiter:"""并发限制器"""def__init__(self,max_concurrent:int):self.semaphore=asyncio.Semaphore(max_concurrent)self.active_count=0self.max_concurrent=max_concurrentasyncdefexecute(self,coro):"""执行受限的协程"""asyncwithself.semaphore:self.active_count+=1try:result=awaitcororeturnresultfinally:self.active_count-=1asyncdefexecute_many(self,coros:List)-List[Any]:"""批量执行受限的协程"""tasks=[self.execute(coro)forcoroincoros]returnawaitasyncio.gather(*tasks)# 使用示例asyncdeflimited_fetch(url:str,limiter:ConcurrencyLimiter)-str:"""受限的数据获取"""print(f"等待获取{url}(当前活跃:{limiter.active_count})")returnawaitlimiter.execute(fetch_data(url,1))asyncdefmain():# 限制最多3个并发limiter=ConcurrencyLimiter(max_concurrent=3)urls=[f"https://api{i}.example.com"foriinrange(10)]coros=[limited_fetch(url,limiter)forurlinurls]results=awaitlimiter.execute_many(coros)print(f"获取完成:{len(results)}个结果")asyncio.run(main())2.2 线程池与进程池集成importasynciofromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimporttimeclassHybridExecutor:"""混合执行器:结合线程池和进程池"""def__init__(self,thread_workers:int=4,process_workers:int=2):self.thread_executor=ThreadPoolExecutor(max_workers=thread_workers)self.process_executor=ProcessPoolExecutor(max_workers=process_workers)asyncdefrun_in_thread(self,func,*args):"""在线程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.thread_executor,func,*args)asyncdefrun_in_process(self,func,*args):"""在进程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.process_executor,func,*args)defshutdown(self):"""关闭执行器"""self.thread_executor.shutdown()self.process_executor.shutdown()# CPU密集型任务defcpu_intensive_task(n:int)-int:"""CPU密集型计算"""total=0foriinrange(n):total+=i*ireturntotal# IO密集型任务defio_intensive_task(duration:float)-str:"""IO密集型任务"""time.sleep(duration)returnf"IO任务完成,耗时{duration}秒"asyncdefmain():executor=HybridExecutor(thread_workers=4,process_workers=2)try:# 混合执行不同类型的任务tasks=[]# CPU密集型任务使用进程池foriinrange(3):task=executor.run_in_process(cpu_intensive_task,1000000)tasks.append(task)# IO密集型任务使用线程池foriinrange(5):task=executor.run_in_thread(io_intensive_task,0.5)tasks.append(task)results=awaitasyncio.gather(*tasks)print(f"所有任务完成:{results}")finally:executor.shutdown()asyncio.run(main())2.3 异步上下文管理器importasynciofromtypingimportOptionalclassAsyncDatabaseConnection:"""异步数据库连接"""def__init__(self,connection_string:str):self.connection_string=connection_string self.connection:Optional[object]=Noneself.is_connected=Falseasyncdef__aenter__(self):"""进入异步上下文"""print(f"连接到数据库:{self.connection_string}")awaitasyncio.sleep(0.1)# 模拟连接时间self.connection=f"Connection({self.connection_string})"self.is_connected=Truereturnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):"""退出异步上下文"""ifself.connection:print("关闭数据库连接")awaitasyncio.sleep(0.05)# 模拟关闭时间self.connection=Noneself.is_connected=False# 返回False表示不抑制异常returnFalseasyncdefexecute
Python异步编程高级模式:asyncio事件循环与并发控制
发布时间:2026/5/30 12:43:02
Python异步编程高级模式:asyncio事件循环与并发控制前言Python异步编程是现代高性能应用开发的关键技术。asyncio作为Python的内置异步框架,提供了强大的事件循环和并发控制能力。本文将深入探讨Python异步编程的高级模式,帮助你构建高效的并发应用。1. asyncio核心概念1.1 事件循环机制事件循环是asyncio的核心,负责调度和执行协程:# 事件循环基本原理importasyncioasyncdefexample_coroutine():"""示例协程"""print("协程开始")awaitasyncio.sleep(1)# 模拟IO操作print("协程结束")return"结果"# 获取事件循环loop=asyncio.get_event_loop()# 运行协程result=loop.run_until_complete(example_coroutine())print(f"结果:{result}")# 关闭循环loop.close()1.2 协程与任务协程是asyncio的基本执行单元,任务是对协程的封装:importasyncioasyncdeffetch_data(url:str,delay:float)-str:"""模拟数据获取"""print(f"开始获取{url}")awaitasyncio.sleep(delay)# 模拟网络延迟print(f"完成获取{url}")returnf"来自{url}的数据"asyncdefmain():# 创建多个协程coros=[fetch_data("https://api1.example.com",2),fetch_data("https://api2.example.com",1),fetch_data("https://api3.example.com",3),]# 方法1:顺序执行(不推荐)# for coro in coros:# result = await coro# 方法2:并发执行(推荐)tasks=[asyncio.create_task(coro)forcoroincoros]results=awaitasyncio.gather(*tasks)print(f"所有结果:{results}")# 运行asyncio.run(main())2. 高级并发模式2.1 信号量控制并发数importasynciofromtypingimportList,AnyclassConcurrencyLimiter:"""并发限制器"""def__init__(self,max_concurrent:int):self.semaphore=asyncio.Semaphore(max_concurrent)self.active_count=0self.max_concurrent=max_concurrentasyncdefexecute(self,coro):"""执行受限的协程"""asyncwithself.semaphore:self.active_count+=1try:result=awaitcororeturnresultfinally:self.active_count-=1asyncdefexecute_many(self,coros:List)-List[Any]:"""批量执行受限的协程"""tasks=[self.execute(coro)forcoroincoros]returnawaitasyncio.gather(*tasks)# 使用示例asyncdeflimited_fetch(url:str,limiter:ConcurrencyLimiter)-str:"""受限的数据获取"""print(f"等待获取{url}(当前活跃:{limiter.active_count})")returnawaitlimiter.execute(fetch_data(url,1))asyncdefmain():# 限制最多3个并发limiter=ConcurrencyLimiter(max_concurrent=3)urls=[f"https://api{i}.example.com"foriinrange(10)]coros=[limited_fetch(url,limiter)forurlinurls]results=awaitlimiter.execute_many(coros)print(f"获取完成:{len(results)}个结果")asyncio.run(main())2.2 线程池与进程池集成importasynciofromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimporttimeclassHybridExecutor:"""混合执行器:结合线程池和进程池"""def__init__(self,thread_workers:int=4,process_workers:int=2):self.thread_executor=ThreadPoolExecutor(max_workers=thread_workers)self.process_executor=ProcessPoolExecutor(max_workers=process_workers)asyncdefrun_in_thread(self,func,*args):"""在线程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.thread_executor,func,*args)asyncdefrun_in_process(self,func,*args):"""在进程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.process_executor,func,*args)defshutdown(self):"""关闭执行器"""self.thread_executor.shutdown()self.process_executor.shutdown()# CPU密集型任务defcpu_intensive_task(n:int)-int:"""CPU密集型计算"""total=0foriinrange(n):total+=i*ireturntotal# IO密集型任务defio_intensive_task(duration:float)-str:"""IO密集型任务"""time.sleep(duration)returnf"IO任务完成,耗时{duration}秒"asyncdefmain():executor=HybridExecutor(thread_workers=4,process_workers=2)try:# 混合执行不同类型的任务tasks=[]# CPU密集型任务使用进程池foriinrange(3):task=executor.run_in_process(cpu_intensive_task,1000000)tasks.append(task)# IO密集型任务使用线程池foriinrange(5):task=executor.run_in_thread(io_intensive_task,0.5)tasks.append(task)results=awaitasyncio.gather(*tasks)print(f"所有任务完成:{results}")finally:executor.shutdown()asyncio.run(main())2.3 异步上下文管理器importasynciofromtypingimportOptionalclassAsyncDatabaseConnection:"""异步数据库连接"""def__init__(self,connection_string:str):self.connection_string=connection_string self.connection:Optional[object]=Noneself.is_connected=Falseasyncdef__aenter__(self):"""进入异步上下文"""print(f"连接到数据库:{self.connection_string}")awaitasyncio.sleep(0.1)# 模拟连接时间self.connection=f"Connection({self.connection_string})"self.is_connected=Truereturnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):"""退出异步上下文"""ifself.connection:print("关闭数据库连接")awaitasyncio.sleep(0.05)# 模拟关闭时间self.connection=Noneself.is_connected=False# 返回False表示不抑制异常returnFalseasyncdefexecute