python中 async 在执行耗时任务时会通过await让出控制权, 那么耗时任务是在哪个线程中执行的? 我不太理解。
2024年10月13日大约 7 分钟
python中 async 在执行耗时任务时会通过await让出控制权, 那么耗时任务是在哪个线程中执行的? 我不太理解。
在 Python 的 asyncio
异步编程模型中,async
和 await
关键字用于编写异步协程,以实现高效的单线程并发。当一个协程在执行过程中遇到 await
时,会将控制权让回给事件循环,以便其他协程能够运行。
您的问题是:在协程通过 await
让出控制权时,耗时任务是在哪个线程中执行的?
关键概念
1. 事件循环和单线程
- 事件循环(Event Loop):
asyncio
的核心,用于调度和执行协程。 - 单线程执行:默认情况下,
asyncio
事件循环在单线程中运行,所有协程也在同一线程中执行。
2. 协程和 await
- 协程(Coroutine):由
async def
定义的函数,可以在执行过程中暂停和恢复。 await
关键字:用于等待一个可等待对象(如另一个协程、Future
、Task
等)完成。当协程遇到await
时,会将控制权让回给事件循环。
3. 非阻塞 I/O 操作
- 非阻塞操作:如网络 I/O、文件 I/O,在等待数据的过程中不阻塞线程。
- 协程的优势:在 I/O 等待期间,让出控制权,使事件循环可以调度其他协程执行。
耗时任务的执行位置
1. 非阻塞的异步任务
- 执行线程:仍然在事件循环所在的主线程中执行。
- 控制权让出:当协程遇到
await
一个非阻塞的异步操作时,它会暂停执行,让出控制权给事件循环。 - 事件循环管理:事件循环会在后台管理这些异步操作,一旦操作完成,协程会被恢复执行。
示例:
import asyncio
async def async_task():
print("异步任务开始")
await asyncio.sleep(2) # 非阻塞地等待 2 秒
print("异步任务完成")
async def main():
await async_task()
asyncio.run(main())
- 解释:
asyncio.sleep(2)
是一个异步非阻塞操作,在等待期间,事件循环可以调度其他协程。
2. 阻塞的同步任务
- 问题:如果协程中包含耗时的同步阻塞操作(如复杂的计算、
time.sleep()
等),会阻塞事件循环,无法让出控制权。 - 执行线程:阻塞操作在事件循环所在的主线程中执行,导致整个事件循环被阻塞。
- 后果:其他协程无法被调度,程序的并发性受损。
示例:
import asyncio
import time
async def blocking_task():
print("阻塞任务开始")
time.sleep(5) # 阻塞操作
print("阻塞任务完成")
async def main():
await blocking_task()
asyncio.run(main())
- 问题:
time.sleep(5)
是阻塞的同步操作,会阻塞事件循环 5 秒。
3. 解决方案:将阻塞任务移至线程或进程
- 使用线程池或进程池:将耗时的阻塞操作放在其他线程或进程中执行,避免阻塞事件循环。
run_in_executor
方法:asyncio
提供的机制,可以将阻塞操作移至线程池或进程池。
示例:
import asyncio
import time
def blocking_task():
print(f"阻塞任务开始,线程:{threading.current_thread().name}")
time.sleep(5)
print(f"阻塞任务完成,线程:{threading.current_thread().name}")
async def main():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, blocking_task)
print("协程任务完成")
asyncio.run(main())
- 解释:
loop.run_in_executor(None, blocking_task)
:将blocking_task
放入默认的线程池中执行。- 执行线程:
blocking_task
在新的线程中执行,不会阻塞事件循环。 - 协程让出控制权:在等待
blocking_task
完成的过程中,协程让出了控制权。
详细解析
1. 当协程遇到 await
时发生了什么?
- 暂停执行:协程在
await
处暂停,等待被等待的对象完成。 - 控制权让出:事件循环可以调度其他可运行的协程。
- 恢复执行:一旦被等待的对象完成,协程会从暂停的地方继续执行。
2. 耗时任务的执行
- 非阻塞异步任务:
- 例子:
await asyncio.sleep(2)
、网络请求(使用支持异步的库,如aiohttp
)。 - 执行位置:事件循环管理的主线程中,但不会阻塞,因为操作是非阻塞的。
- 例子:
- 阻塞同步任务:
- 例子:复杂计算、
time.sleep()
、不支持异步的文件 I/O。 - 问题:在协程中直接执行会阻塞事件循环。
- 例子:复杂计算、
3. 如何避免阻塞事件循环?
将阻塞操作移至线程或进程:
- 线程池执行器:适用于 I/O 密集型的阻塞操作。
- 进程池执行器:适用于 CPU 密集型的阻塞操作。
代码示例:
import asyncio import concurrent.futures def cpu_bound_task(): # 模拟耗时的 CPU 密集型任务 result = sum(i * i for i in range(10 ** 7)) return result async def main(): loop = asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, cpu_bound_task) print(f"计算结果:{result}") asyncio.run(main())
- 解释:
ProcessPoolExecutor
:使用进程池来执行 CPU 密集型任务。run_in_executor
:协程等待任务完成,但不会阻塞事件循环。
- 解释:
总结
- 默认情况下,协程中的代码(包括耗时任务)是在事件循环所在的主线程中执行的。
- 当协程遇到
await
时,如果等待的是一个异步非阻塞操作,协程会暂停执行,事件循环可以调度其他协程。 - 如果协程中包含阻塞的同步操作,会阻塞事件循环,阻碍其他协程的执行。
- 为避免阻塞事件循环,需要将耗时的阻塞操作移至线程池或进程池中执行。
完整示例
假设我们有一个需要读取大量数据的函数,但它是同步阻塞的:
def read_large_file():
with open('large_file.txt', 'r') as f:
return f.read()
如果直接在协程中调用会阻塞事件循环:
async def main():
data = read_large_file() # 阻塞事件循环
process_data(data)
asyncio.run(main())
正确的做法是使用线程池:
import asyncio
def read_large_file():
with open('large_file.txt', 'r') as f:
return f.read()
async def main():
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, read_large_file)
process_data(data)
asyncio.run(main())
- 解释:
loop.run_in_executor(None, read_large_file)
:将read_large_file
函数放入默认的线程池中执行。- 线程池中的执行:
read_large_file
在新的线程中执行,不会阻塞事件循环。 - 协程等待结果:
await
用于等待函数的返回结果,但在等待期间,事件循环可以调度其他协程。
回答您的疑问
耗时任务在哪个线程中执行?
- 非阻塞的异步任务:在事件循环的主线程中执行,不会阻塞,因为它们在等待 I/O 操作完成时让出了控制权。
- 阻塞的同步任务:如果直接在协程中调用,会在主线程中执行,阻塞事件循环。为了避免阻塞,需要将其移至线程池或进程池中执行。
为什么要让出控制权?
- 提高并发性:通过在
await
处让出控制权,其他协程可以被调度执行,提高程序的并发性能。 - 避免阻塞事件循环:让出控制权确保事件循环可以持续运行,处理 I/O 事件和调度协程。
- 提高并发性:通过在
如何决定任务的执行位置?
- 非阻塞的异步操作:可以直接在协程中
await
,由事件循环管理。 - 阻塞的同步操作:应使用
run_in_executor
将其移至线程池或进程池中执行。
- 非阻塞的异步操作:可以直接在协程中
额外提示
- 使用异步库:在可能的情况下,使用支持异步的库(如
aiohttp
)来执行 I/O 操作,避免阻塞事件循环。 - 避免在协程中使用阻塞操作:如
time.sleep()
、阻塞的文件 I/O 等,应替换为异步版本(如asyncio.sleep()
)。 - 线程池 vs. 进程池:
- 线程池(ThreadPoolExecutor):适用于 I/O 密集型的阻塞任务,因为线程在等待 I/O 时不会占用 GIL。
- 进程池(ProcessPoolExecutor):适用于 CPU 密集型的阻塞任务,因为进程有独立的 Python 解释器,不受 GIL 的限制。
总结
- 事件循环在单线程中运行,默认情况下,协程也是在同一线程中执行的。
- 当协程遇到
await
时,会将控制权让回给事件循环,以便调度其他协程。 - 耗时的非阻塞异步任务(如网络 I/O)在主线程中执行,但不会阻塞,因为它们在等待时让出了控制权。
- 耗时的阻塞同步任务会阻塞事件循环,需要使用
run_in_executor
将其移至线程池或进程池中执行。
希望这能帮助您理解 Python 中 async
和 await
的执行机制,以及耗时任务的执行位置。如果您还有其他疑问,请随时提问!