python async await 与多线程如何结合使用
python async await 与多线程如何结合使用
在 Python 中,async
/await
(基于 asyncio
)和多线程(threading
模块)是两种不同的并发模型。asyncio
提供了协程式的并发,适用于 I/O 密集型任务,而多线程则适用于需要真正并行处理的场景,如处理 CPU 密集型任务或调用阻塞的 I/O 操作。
有时,我们可能需要将 asyncio
与多线程结合使用,以充分利用两者的优势。下面将详细介绍如何在 Python 中将 async
/await
与多线程结合使用,并提供相应的代码示例。
一、为什么要将 asyncio
与多线程结合
- 处理阻塞的 I/O 操作:在异步代码中,如果需要调用阻塞的 I/O 操作(如文件读写、网络请求等),可以使用多线程将其封装,避免阻塞事件循环。
- 利用多核 CPU:由于全局解释器锁(GIL)的存在,Python 的多线程不能并行执行 CPU 密集型任务。但在 I/O 密集型任务中,多线程仍然有用。
- 与现有代码库集成:在已有的多线程代码中引入异步操作,或者在异步代码中使用需要多线程的库。
二、主要的结合方式
- 在异步代码中使用线程池:使用
asyncio
提供的run_in_executor()
方法,在异步协程中运行阻塞的函数,将其放入线程池执行。 - 在多线程中运行事件循环:在单独的线程中运行
asyncio
的事件循环,以便同时运行异步任务和其他线程任务。 - 跨线程通信:使用线程安全的队列(如
queue.Queue
或asyncio.Queue
)在协程和线程之间传递数据。
三、在异步代码中使用线程池
3.1 使用 run_in_executor
asyncio
提供了 run_in_executor()
方法,可以在协程中运行阻塞的函数,将其委派给线程池或进程池,以避免阻塞事件循环。
示例
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
print(f"开始阻塞的 I/O 操作,线程:{threading.current_thread().name}")
time.sleep(2)
print(f"阻塞的 I/O 操作完成,线程:{threading.current_thread().name}")
async def main():
loop = asyncio.get_running_loop()
print(f"主协程运行在线程:{threading.current_thread().name}")
# 使用默认的线程池执行器
await loop.run_in_executor(None, blocking_io)
print("任务完成")
if __name__ == "__main__":
asyncio.run(main())
输出
主协程运行在线程:MainThread
开始阻塞的 I/O 操作,线程:ThreadPoolExecutor-0_0
阻塞的 I/O 操作完成,线程:ThreadPoolExecutor-0_0
任务完成
解释
blocking_io
:一个阻塞的函数,模拟耗时的 I/O 操作。loop.run_in_executor()
:- 第一个参数为
None
,表示使用默认的线程池执行器。 - 第二个参数是需要执行的阻塞函数。
- 第一个参数为
- 在协程中等待阻塞函数完成:使用
await
,但不会阻塞事件循环。
3.2 使用自定义线程池
您也可以创建自定义的线程池执行器,以控制线程的数量。
executor = ThreadPoolExecutor(max_workers=5)
await loop.run_in_executor(executor, blocking_io)
四、在多线程中运行事件循环
在某些情况下,您可能需要在一个单独的线程中运行 asyncio
事件循环,以便在主线程中执行其他操作。
4.1 示例
import asyncio
import threading
async def async_task():
print(f"异步任务开始,线程:{threading.current_thread().name}")
await asyncio.sleep(2)
print(f"异步任务完成,线程:{threading.current_thread().name}")
def start_event_loop(loop):
# 设置事件循环为当前线程的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
# 创建新的事件循环
new_loop = asyncio.new_event_loop()
# 在新的线程中启动事件循环
t = threading.Thread(target=start_event_loop, args=(new_loop,))
t.start()
# 在事件循环中添加任务
asyncio.run_coroutine_threadsafe(async_task(), new_loop)
# 主线程继续执行其他操作
print(f"主线程继续运行,线程:{threading.current_thread().name}")
# 等待一段时间后关闭事件循环
time.sleep(3)
new_loop.call_soon_threadsafe(new_loop.stop)
t.join()
输出
主线程继续运行,线程:MainThread
异步任务开始,线程:Thread-1
异步任务完成,线程:Thread-1
解释
async_task
:一个简单的异步任务。start_event_loop
:在新的线程中运行事件循环。asyncio.run_coroutine_threadsafe()
:在线程安全的情况下,将协程提交到事件循环中。new_loop.call_soon_threadsafe(new_loop.stop)
:线程安全地停止事件循环。
4.2 注意事项
- 事件循环与线程绑定:每个线程只能有一个事件循环,事件循环也只能在创建它的线程中运行。
- 线程安全:使用
asyncio.run_coroutine_threadsafe()
和call_soon_threadsafe()
来在线程间通信。
五、跨线程通信
在协程和线程之间传递数据,可以使用线程安全的队列。
5.1 使用 queue.Queue
import asyncio
import threading
import queue
def producer(q):
for i in range(5):
print(f"生产者:放入数据 {i}")
q.put(i)
time.sleep(1)
async def consumer(q):
while True:
data = await loop.run_in_executor(None, q.get)
print(f"消费者:获取数据 {data}")
if data == 4:
break
if __name__ == "__main__":
q = queue.Queue()
loop = asyncio.get_event_loop()
threading.Thread(target=producer, args=(q,)).start()
loop.run_until_complete(consumer(q))
loop.close()
输出
生产者:放入数据 0
消费者:获取数据 0
生产者:放入数据 1
消费者:获取数据 1
生产者:放入数据 2
消费者:获取数据 2
生产者:放入数据 3
消费者:获取数据 3
生产者:放入数据 4
消费者:获取数据 4
5.2 使用 asyncio.Queue
如果所有代码都是异步的,或者可以在线程中使用异步队列,也可以使用 asyncio.Queue
。
六、在协程中运行线程代码
有时,您可能需要在协程中启动新的线程来处理某些任务。
6.1 示例
import asyncio
import threading
def blocking_task():
print(f"阻塞任务开始,线程:{threading.current_thread().name}")
time.sleep(2)
print(f"阻塞任务完成,线程:{threading.current_thread().name}")
async def main():
loop = asyncio.get_running_loop()
# 在协程中启动新的线程
await loop.run_in_executor(None, blocking_task)
print("协程中的线程任务已完成")
asyncio.run(main())
解释
loop.run_in_executor()
:在协程中运行线程任务,避免阻塞事件循环。- 协程等待线程任务完成:使用
await
等待线程中的任务完成。
七、综合示例
下面是一个综合示例,演示如何在异步代码中使用线程池,以及在线程中运行事件循环,并进行跨线程通信。
7.1 示例代码
import asyncio
import threading
import time
import queue
# 线程安全的队列
q = queue.Queue()
# 阻塞的 I/O 操作
def blocking_io_task(task_id):
print(f"线程 {threading.current_thread().name}:开始阻塞 I/O 任务 {task_id}")
time.sleep(2)
result = f"结果 {task_id}"
q.put(result)
print(f"线程 {threading.current_thread().name}:完成阻塞 I/O 任务 {task_id}")
# 异步任务,从队列中获取结果
async def async_task():
while True:
result = await loop.run_in_executor(None, q.get)
print(f"协程接收到:{result}")
if result == "结果 2":
break
# 在单独的线程中运行事件循环
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
# 创建新的事件循环
loop = asyncio.new_event_loop()
t = threading.Thread(target=start_loop, args=(loop,))
t.start()
# 启动异步任务
asyncio.run_coroutine_threadsafe(async_task(), loop)
# 在主线程中使用线程池执行阻塞任务
with ThreadPoolExecutor(max_workers=3) as executor:
for i in range(3):
executor.submit(blocking_io_task, i)
# 等待所有任务完成
time.sleep(5)
# 停止事件循环
loop.call_soon_threadsafe(loop.stop)
t.join()
输出
线程 ThreadPoolExecutor-0_0:开始阻塞 I/O 任务 0
线程 ThreadPoolExecutor-0_1:开始阻塞 I/O 任务 1
线程 ThreadPoolExecutor-0_2:开始阻塞 I/O 任务 2
协程接收到:结果 0
线程 ThreadPoolExecutor-0_0:完成阻塞 I/O 任务 0
协程接收到:结果 1
线程 ThreadPoolExecutor-0_1:完成阻塞 I/O 任务 1
协程接收到:结果 2
线程 ThreadPoolExecutor-0_2:完成阻塞 I/O 任务 2
八、注意事项
- GIL(全局解释器锁):在 Python 中,多线程不能真正并行执行 CPU 密集型任务,因为 GIL 的存在。但对于 I/O 密集型任务,多线程仍然有效。
- 事件循环线程安全:
asyncio
的事件循环和协程不是线程安全的,除非使用线程安全的方法(如run_coroutine_threadsafe
、call_soon_threadsafe
)。 - 避免死锁:在跨线程通信时,需要注意线程间的同步,避免死锁和竞争条件。
- 资源管理:确保在线程和事件循环结束时,正确地关闭和清理资源。
九、总结
asyncio
与多线程结合使用:可以在异步代码中利用线程来处理阻塞的任务,或在多线程环境中运行异步事件循环。- 线程池执行器:使用
run_in_executor()
方法,在协程中运行阻塞的函数,避免阻塞事件循环。 - 在线程中运行事件循环:当需要并行运行多个事件循环或与现有的线程代码集成时,可以在新的线程中运行事件循环。
- 跨线程通信:使用线程安全的队列或其他同步原语,在线程和协程之间传递数据。
通过合理地将 asyncio
与多线程结合使用,您可以充分利用两种并发模型的优势,提高程序的性能和响应能力。
如果您还有其他问题或需要进一步的帮助,请随时提问!在 Python 中,async
/await
(基于 asyncio
)和多线程(threading
模块)是两种不同的并发模型。asyncio
提供了协程式的并发,适用于 I/O 密集型任务,而多线程则适用于需要真正并行处理的场景,如处理 CPU 密集型任务或调用阻塞的 I/O 操作。
有时,我们可能需要将 asyncio
与多线程结合使用,以充分利用两者的优势。下面将详细介绍如何在 Python 中将 async
/await
与多线程结合使用,并提供相应的代码示例。
一、为什么要将 asyncio
与多线程结合
- 处理阻塞的 I/O 操作:在异步代码中,如果需要调用阻塞的 I/O 操作(如文件读写、网络请求等),可以使用多线程将其封装,避免阻塞事件循环。
- 利用多核 CPU:由于全局解释器锁(GIL)的存在,Python 的多线程不能并行执行 CPU 密集型任务。但在 I/O 密集型任务中,多线程仍然有用。
- 与现有代码库集成:在已有的多线程代码中引入异步操作,或者在异步代码中使用需要多线程的库。
二、主要的结合方式
- 在异步代码中使用线程池:使用
asyncio
提供的run_in_executor()
方法,在异步协程中运行阻塞的函数,将其放入线程池执行。 - 在多线程中运行事件循环:在单独的线程中运行
asyncio
的事件循环,以便同时运行异步任务和其他线程任务。 - 跨线程通信:使用线程安全的队列(如
queue.Queue
或asyncio.Queue
)在协程和线程之间传递数据。
三、在异步代码中使用线程池
3.1 使用 run_in_executor
asyncio
提供了 run_in_executor()
方法,可以在协程中运行阻塞的函数,将其委派给线程池或进程池,以避免阻塞事件循环。
示例
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
print(f"开始阻塞的 I/O 操作,线程:{threading.current_thread().name}")
time.sleep(2)
print(f"阻塞的 I/O 操作完成,线程:{threading.current_thread().name}")
async def main():
loop = asyncio.get_running_loop()
print(f"主协程运行在线程:{threading.current_thread().name}")
# 使用默认的线程池执行器
await loop.run_in_executor(None, blocking_io)
print("任务完成")
if __name__ == "__main__":
asyncio.run(main())
输出
主协程运行在线程:MainThread
开始阻塞的 I/O 操作,线程:ThreadPoolExecutor-0_0
阻塞的 I/O 操作完成,线程:ThreadPoolExecutor-0_0
任务完成
解释
blocking_io
:一个阻塞的函数,模拟耗时的 I/O 操作。loop.run_in_executor()
:- 第一个参数为
None
,表示使用默认的线程池执行器。 - 第二个参数是需要执行的阻塞函数。
- 第一个参数为
- 在协程中等待阻塞函数完成:使用
await
,但不会阻塞事件循环。
3.2 使用自定义线程池
您也可以创建自定义的线程池执行器,以控制线程的数量。
executor = ThreadPoolExecutor(max_workers=5)
await loop.run_in_executor(executor, blocking_io)
四、在多线程中运行事件循环
在某些情况下,您可能需要在一个单独的线程中运行 asyncio
事件循环,以便在主线程中执行其他操作。
4.1 示例
import asyncio
import threading
async def async_task():
print(f"异步任务开始,线程:{threading.current_thread().name}")
await asyncio.sleep(2)
print(f"异步任务完成,线程:{threading.current_thread().name}")
def start_event_loop(loop):
# 设置事件循环为当前线程的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
# 创建新的事件循环
new_loop = asyncio.new_event_loop()
# 在新的线程中启动事件循环
t = threading.Thread(target=start_event_loop, args=(new_loop,))
t.start()
# 在事件循环中添加任务
asyncio.run_coroutine_threadsafe(async_task(), new_loop)
# 主线程继续执行其他操作
print(f"主线程继续运行,线程:{threading.current_thread().name}")
# 等待一段时间后关闭事件循环
time.sleep(3)
new_loop.call_soon_threadsafe(new_loop.stop)
t.join()
输出
主线程继续运行,线程:MainThread
异步任务开始,线程:Thread-1
异步任务完成,线程:Thread-1
解释
async_task
:一个简单的异步任务。start_event_loop
:在新的线程中运行事件循环。asyncio.run_coroutine_threadsafe()
:在线程安全的情况下,将协程提交到事件循环中。new_loop.call_soon_threadsafe(new_loop.stop)
:线程安全地停止事件循环。
4.2 注意事项
- 事件循环与线程绑定:每个线程只能有一个事件循环,事件循环也只能在创建它的线程中运行。
- 线程安全:使用
asyncio.run_coroutine_threadsafe()
和call_soon_threadsafe()
来在线程间通信。
五、跨线程通信
在协程和线程之间传递数据,可以使用线程安全的队列。
5.1 使用 queue.Queue
import asyncio
import threading
import queue
def producer(q):
for i in range(5):
print(f"生产者:放入数据 {i}")
q.put(i)
time.sleep(1)
async def consumer(q):
while True:
data = await loop.run_in_executor(None, q.get)
print(f"消费者:获取数据 {data}")
if data == 4:
break
if __name__ == "__main__":
q = queue.Queue()
loop = asyncio.get_event_loop()
threading.Thread(target=producer, args=(q,)).start()
loop.run_until_complete(consumer(q))
loop.close()
输出
生产者:放入数据 0
消费者:获取数据 0
生产者:放入数据 1
消费者:获取数据 1
生产者:放入数据 2
消费者:获取数据 2
生产者:放入数据 3
消费者:获取数据 3
生产者:放入数据 4
消费者:获取数据 4
5.2 使用 asyncio.Queue
如果所有代码都是异步的,或者可以在线程中使用异步队列,也可以使用 asyncio.Queue
。
六、在协程中运行线程代码
有时,您可能需要在协程中启动新的线程来处理某些任务。
6.1 示例
import asyncio
import threading
def blocking_task():
print(f"阻塞任务开始,线程:{threading.current_thread().name}")
time.sleep(2)
print(f"阻塞任务完成,线程:{threading.current_thread().name}")
async def main():
loop = asyncio.get_running_loop()
# 在协程中启动新的线程
await loop.run_in_executor(None, blocking_task)
print("协程中的线程任务已完成")
asyncio.run(main())
解释
loop.run_in_executor()
:在协程中运行线程任务,避免阻塞事件循环。- 协程等待线程任务完成:使用
await
等待线程中的任务完成。
七、综合示例
下面是一个综合示例,演示如何在异步代码中使用线程池,以及在线程中运行事件循环,并进行跨线程通信。
7.1 示例代码
import asyncio
import threading
import time
import queue
# 线程安全的队列
q = queue.Queue()
# 阻塞的 I/O 操作
def blocking_io_task(task_id):
print(f"线程 {threading.current_thread().name}:开始阻塞 I/O 任务 {task_id}")
time.sleep(2)
result = f"结果 {task_id}"
q.put(result)
print(f"线程 {threading.current_thread().name}:完成阻塞 I/O 任务 {task_id}")
# 异步任务,从队列中获取结果
async def async_task():
while True:
result = await loop.run_in_executor(None, q.get)
print(f"协程接收到:{result}")
if result == "结果 2":
break
# 在单独的线程中运行事件循环
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
# 创建新的事件循环
loop = asyncio.new_event_loop()
t = threading.Thread(target=start_loop, args=(loop,))
t.start()
# 启动异步任务
asyncio.run_coroutine_threadsafe(async_task(), loop)
# 在主线程中使用线程池执行阻塞任务
with ThreadPoolExecutor(max_workers=3) as executor:
for i in range(3):
executor.submit(blocking_io_task, i)
# 等待所有任务完成
time.sleep(5)
# 停止事件循环
loop.call_soon_threadsafe(loop.stop)
t.join()
输出
线程 ThreadPoolExecutor-0_0:开始阻塞 I/O 任务 0
线程 ThreadPoolExecutor-0_1:开始阻塞 I/O 任务 1
线程 ThreadPoolExecutor-0_2:开始阻塞 I/O 任务 2
协程接收到:结果 0
线程 ThreadPoolExecutor-0_0:完成阻塞 I/O 任务 0
协程接收到:结果 1
线程 ThreadPoolExecutor-0_1:完成阻塞 I/O 任务 1
协程接收到:结果 2
线程 ThreadPoolExecutor-0_2:完成阻塞 I/O 任务 2
八、注意事项
- GIL(全局解释器锁):在 Python 中,多线程不能真正并行执行 CPU 密集型任务,因为 GIL 的存在。但对于 I/O 密集型任务,多线程仍然有效。
- 事件循环线程安全:
asyncio
的事件循环和协程不是线程安全的,除非使用线程安全的方法(如run_coroutine_threadsafe
、call_soon_threadsafe
)。 - 避免死锁:在跨线程通信时,需要注意线程间的同步,避免死锁和竞争条件。
- 资源管理:确保在线程和事件循环结束时,正确地关闭和清理资源。
九、总结
asyncio
与多线程结合使用:可以在异步代码中利用线程来处理阻塞的任务,或在多线程环境中运行异步事件循环。- 线程池执行器:使用
run_in_executor()
方法,在协程中运行阻塞的函数,避免阻塞事件循环。 - 在线程中运行事件循环:当需要并行运行多个事件循环或与现有的线程代码集成时,可以在新的线程中运行事件循环。
- 跨线程通信:使用线程安全的队列或其他同步原语,在线程和协程之间传递数据。
通过合理地将 asyncio
与多线程结合使用,您可以充分利用两种并发模型的优势,提高程序的性能和响应能力。
如果您还有其他问题或需要进一步的帮助,请随时提问!