python在多个线程当中,开启多个事件循环

参考链接

深入理解asyncio(三)

我自己写的总结

背景

我一直在想, 虽然异步代码切换很快, 比多线程多进程能创建更多的协程来支持并发. 但是事件循环只能在单进程单线程当中进行并发.

有没有可能在多线程中, 每个线程再开启一个事件循环呢? 研读了董伟明的公众号之后, 发现原来可以😺. 但是多进程不可以. 因为EventLoop对象不能被管道传递, 而python多进程的本质是底层通过管道来发送消息进行通信.

逻辑要点

在主线程当中通过asyncio.new_event_loop创建三个事件循环. 然后开启三个线程, 分别将对应的事件循环跑起来. 通过asyncio.run_coroutine_threadsafe(coroutine_object, loop_object)来让协程在其他线程中运行. 运行完之后一定要关闭事件循环, 否则loop.run_forever()会阻塞线程关闭, 可以通过loop.call_soon_threadsafe(callback)在callback函数中关闭事件循环.

Code

# -*- coding: utf-8 -*-
__author__ = '陈章'
__date__ = '2019-05-30 16:46'

import asyncio
import time
from functools import partial
from threading import Thread, current_thread


async def a():
    # 要执行的任务函数
    await asyncio.sleep(1)
    return f"return value from {current_thread().name}"


def start_loop(loop):
    # 在每一个线程的target函数中启动事件循环
    asyncio.set_event_loop(loop)
    loop.run_forever()


def stop_loop(loop):
    # 在任务完成之后通过call_soon_threadsafe关闭事件循环
    loop.stop()


async def main():
    start = time.perf_counter()
    # 创建三个事件循环, 让每一个线程去开启
    new_loop1 = asyncio.new_event_loop()
    new_loop2 = asyncio.new_event_loop()
    new_loop3 = asyncio.new_event_loop()
    t1 = Thread(target=start_loop, args=(new_loop1,))
    t2 = Thread(target=start_loop, args=(new_loop2,))
    t3 = Thread(target=start_loop, args=(new_loop3,))
    t1.start()
    t2.start()
    t3.start()
    # 使用asyncio.run_coroutine_threadsafe让任务在其他线程中执行, 线程安全.
    # 在这里每个线程我开启了3个任务
    future1, future2, future3 = [asyncio.run_coroutine_threadsafe(a(), new_loop1) for i in range(3)]
    future4, future5, future6 = [asyncio.run_coroutine_threadsafe(a(), new_loop2) for i in range(3)]
    future7, future8, future9 = [asyncio.run_coroutine_threadsafe(a(), new_loop3) for i in range(3)]
    all_futures = [future1, future2, future3, future4, future5, future6, future7, future8, future9]
    print(all_futures)
    # 获取结果, timeout要大于任务执行时间,否则会报错TimeoutError
    print([f.result(timeout=2) for f in all_futures])
    # 通过loop.call_soon_threadsafe在任务结束之后关闭事件循环
    new_loop1.call_soon_threadsafe(partial(stop_loop, new_loop1))
    new_loop2.call_soon_threadsafe(partial(stop_loop, new_loop2))
    new_loop3.call_soon_threadsafe(partial(stop_loop, new_loop3))
    end = time.perf_counter()
    print(end - start)


if __name__ == "__main__":
    asyncio.run(main())
    
    
"""
Output:
[<Future at 0x10ac58160 state=pending>, <Future at 0x10ac581d0 state=pending>, <Future at 0x10ac58240 state=pending>, <Future at 0x10ac582b0 state=pending>, <Future at 0x10ac58320 state=pending>, <Future at 0x10ac58390 state=pending>, <Future at 0x10ac58400 state=pending>, <Future at 0x10ac58470 state=pending>, <Future at 0x10ac584e0 state=pending>]
['return value from Thread-1', 'return value from Thread-1', 'return value from Thread-1', 'return value from Thread-2', 'return value from Thread-2', 'return value from Thread-2', 'return value from Thread-3', 'return value from Thread-3', 'return value from Thread-3']
1.006172254
"""