Pythonasyncio子进程管理 Python asyncio 子进程管理异步执行外部命令asyncio 提供了 create_subprocess_exec 和 create_subprocess_shell来异步管理子进程避免阻塞事件循环。本文详解完整用法。一、create_subprocess_exec执行外部程序import asyncioimport sysasync def run_cmd_simple():异步执行一个外部命令等待其完成。create_subprocess_exec 直接执行程序不经过 shell。print(开始执行 ls -la ...)# 创建子进程第一个参数是可执行文件路径后续是参数process await asyncio.create_subprocess_exec(ls if sys.platform ! win32 else dir, # Linux/macOS 用 ls-la,# 默认继承父进程的标准流stdoutasyncio.subprocess.PIPE, # 捕获标准输出stderrasyncio.subprocess.PIPE, # 捕获标准错误)# communicate() 等待进程结束并返回 (stdout, stderr)stdout, stderr await process.communicate()print(f返回码: {process.returncode})if stdout:print(f标准输出 ({len(stdout)} 字节):)print(stdout.decode(errorsreplace)[:500])if stderr:print(f标准错误: {stderr.decode(errorsreplace)})二、create_subprocess_shell通过 Shell 执行async def run_cmd_shell():通过系统 shell 执行命令。注意存在 shell 注入风险不要用于不可信的输入。process await asyncio.create_subprocess_shell(echo Hello from asyncio sleep 1 echo Done,stdoutasyncio.subprocess.PIPE,stderrasyncio.subprocess.PIPE,)stdout, stderr await process.communicate()print(fShell 输出: {stdout.decode()})# create_subprocess_exec 与 create_subprocess_shell 的区别# - exec: 直接执行程序更安全不需要 shell 转义# - shell: 通过 /bin/sh (Windows: cmd.exe) 执行支持管道、重定向# - exec 推荐用于所有可以避免使用 shell 的场景三、Process 类详解async def process_class_demo():展示 asyncio.subprocess.Process 的完整方法。Process 对象通过 create_subprocess_* 返回。process await asyncio.create_subprocess_exec(python3, --version,stdoutasyncio.subprocess.PIPE,stderrasyncio.subprocess.PIPE,# 工作目录cwd/tmp,# 环境变量env{PYTHONUNBUFFERED: 1},)# 进程标识print(fPID: {process.pid})# 等待进程结束获取返回码returncode await process.wait()print(f返回码: {returncode})# 检查进程状态if process.returncode 0:print(执行成功)else:print(f执行失败返回码: {process.returncode})# 发送信号仅 Unix# process.send_signal(signal.SIGTERM)# process.terminate()# process.kill()四、读取子进程输出逐行读取async def stream_subprocess_output():逐行读取子进程输出适用于长时间运行的进程。使用 stdout.readline() 而不是 communicate()后者会等待进程结束才返回所有数据。process await asyncio.create_subprocess_exec(ping, -c, 5, 127.0.0.1,stdoutasyncio.subprocess.PIPE,stderrasyncio.subprocess.PIPE,)print(开始逐行读取 ping 输出)# 逐行读取标准输出实时读取while True:line await process.stdout.readline()if not line:break # EOF进程已结束# 解码并去除换行符text line.decode(utf-8, errorsreplace).rstrip()print(f[ping] {text})# 确保进程已完全结束await process.wait()print(f进程结束返回码: {process.returncode})五、超时控制async def subprocess_with_timeout():为子进程设置超时。使用 asyncio.wait_for 包装 communicate()。process await asyncio.create_subprocess_exec(sleep, 10, # 持续 10 秒stdoutasyncio.subprocess.PIPE,)try:# 设置 3 秒超时stdout, stderr await asyncio.wait_for(process.communicate(), timeout3.0)except asyncio.TimeoutError:print(子进程执行超时正在终止...)process.terminate() # 发送 SIGTERM# 等待进程真正结束await process.wait()print(f进程已终止返回码: {process.returncode})except Exception as e:print(f其他错误: {e})process.kill()await process.wait()六、并发执行多个子进程async def run_multiple_subprocesses():使用 asyncio.gather 同时执行多个子进程。这比串行执行快得多。async def run_one(cmd: str, delay: int):运行一个子进程并返回结果print(f开始执行: {cmd})process await asyncio.create_subprocess_shell(fecho Starting {cmd} sleep {delay} echo {cmd} done,stdoutasyncio.subprocess.PIPE,shellTrue,)stdout, _ await process.communicate()return stdout.decode().strip()# 并发执行 3 个任务results await asyncio.gather(run_one(Task-1, 2),run_one(Task-2, 1),run_one(Task-3, 3),)for result in results:print(f结果: {result})七、与子进程交互写入 stdinasync def interact_with_subprocess():向子进程的标准输入写入数据并读取输出。适用于交互式程序。process await asyncio.create_subprocess_exec(python3, -c,import sysfor line in sys.stdin:sys.stdout.write(f处理: {line.strip().upper()}\\n)sys.stdout.flush(),stdinasyncio.subprocess.PIPE,stdoutasyncio.subprocess.PIPE,stderrasyncio.subprocess.PIPE,)# 向 stdin 写入数据process.stdin.write(bhello\nworld\nasyncio\n)# 务必调用 drain() 刷新缓冲区await process.stdin.drain()# 关闭 stdin 表示输入结束process.stdin.close()# 读取所有输出stdout, stderr await process.communicate()print(子进程输出:)print(stdout.decode())八、总结# 1. create_subprocess_exec 直接执行程序更安全# 2. create_subprocess_shell 经过 shell有注入风险# 3. communicate() 同时读写 stdin/stdout/stderr 并等待结束# 4. wait() 仅等待进程结束获取返回码# 5. 逐行读取适用于实时流式输出场景# 6. asyncio.wait_for 可为子进程设置超时# 7. asyncio.gather 可并发管理多个子进程# 8. 通过 stdin.write() 和 drain() 向子进程发送数据if __name__ __main__:asyncio.run(stream_subprocess_output())