参考链接
背景
我一直在想, 虽然异步代码切换很快, 比多线程多进程能创建更多的协程来支持并发. 但是事件循环只能在单进程单线程当中进行并发.
有没有可能在多线程中, 每个线程再开启一个事件循环呢? 研读了董伟明的公众号之后, 发现原来可以😺. 但是多进程不可以. 因为EventLoop对象不能被管道传递, 而python多进程的本质是底层通过管道来发送消息进行通信.
逻辑要点
在主线程当中通过asyncio.new_event_loop
创建三个事件循环. 然后开启三个线程, 分别将对应的事件循环跑起来. 通过asyncio.run_coroutine_threadsafe(coroutine_object, loop_object)
来让协程在其他线程中运行. 运行完之后一定要关闭事件循环, 否则loop.run_forever()
会阻塞线程关闭, 可以通过loop.call_soon_threadsafe(callback)
在callback函数中关闭事件循环.
Code
# -*- coding: utf-8 -*-
__author__ = '陈章'
__date__ = '2019-05-30 16:46'
import asyncio
import time
from functools import partial
from threading import Thread, current_thread
async def a():
# 要执行的任务函数
await asyncio.sleep(1)
return f"return value from {current_thread().name}"
def start_loop(loop):
# 在每一个线程的target函数中启动事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
def stop_loop(loop):
# 在任务完成之后通过call_soon_threadsafe关闭事件循环
loop.stop()
async def main():
start = time.perf_counter()
# 创建三个事件循环, 让每一个线程去开启
new_loop1 = asyncio.new_event_loop()
new_loop2 = asyncio.new_event_loop()
new_loop3 = asyncio.new_event_loop()
t1 = Thread(target=start_loop, args=(new_loop1,))
t2 = Thread(target=start_loop, args=(new_loop2,))
t3 = Thread(target=start_loop, args=(new_loop3,))
t1.start()
t2.start()
t3.start()
# 使用asyncio.run_coroutine_threadsafe让任务在其他线程中执行, 线程安全.
# 在这里每个线程我开启了3个任务
future1, future2, future3 = [asyncio.run_coroutine_threadsafe(a(), new_loop1) for i in range(3)]
future4, future5, future6 = [asyncio.run_coroutine_threadsafe(a(), new_loop2) for i in range(3)]
future7, future8, future9 = [asyncio.run_coroutine_threadsafe(a(), new_loop3) for i in range(3)]
all_futures = [future1, future2, future3, future4, future5, future6, future7, future8, future9]
print(all_futures)
# 获取结果, timeout要大于任务执行时间,否则会报错TimeoutError
print([f.result(timeout=2) for f in all_futures])
# 通过loop.call_soon_threadsafe在任务结束之后关闭事件循环
new_loop1.call_soon_threadsafe(partial(stop_loop, new_loop1))
new_loop2.call_soon_threadsafe(partial(stop_loop, new_loop2))
new_loop3.call_soon_threadsafe(partial(stop_loop, new_loop3))
end = time.perf_counter()
print(end - start)
if __name__ == "__main__":
asyncio.run(main())
"""
Output:
[<Future at 0x10ac58160 state=pending>, <Future at 0x10ac581d0 state=pending>, <Future at 0x10ac58240 state=pending>, <Future at 0x10ac582b0 state=pending>, <Future at 0x10ac58320 state=pending>, <Future at 0x10ac58390 state=pending>, <Future at 0x10ac58400 state=pending>, <Future at 0x10ac58470 state=pending>, <Future at 0x10ac584e0 state=pending>]
['return value from Thread-1', 'return value from Thread-1', 'return value from Thread-1', 'return value from Thread-2', 'return value from Thread-2', 'return value from Thread-2', 'return value from Thread-3', 'return value from Thread-3', 'return value from Thread-3']
1.006172254
"""