asyncio使用中的若干问题

问题

跑代码的时候直接报错:

RuntimeError: This event loop is already running

解决方案

安装库解决

pip install nest_asyncio
import nest_asyncio

nest_asyncio.apply()

问题

如何编写测试asyncio的异步函数的单元测试

解决方案

import inspect


def async_test(f):
    def wrapper(*args, **kwargs):
        if inspect.iscoroutinefunction(f):
            future = f(*args, **kwargs)
        else:
            coroutine = asyncio.coroutine(f)
            future = coroutine(*args, **kwargs)
        asyncio.get_event_loop().run_until_complete(future)

    return wrapper

class TestExample(unittest.TestCase):
    def test_upper(self):
        self.assertEqual('foo'.upper(), 'FOO')
        
        # asyncio test normal function with yield from statements
    @async_test
    def test_tcp1_success(self):
        test = yield from asyncio.open_connection('www.baidu.com', 80)
        print('test_tcp1_success')
        
         # asyncio test coroutine function with yield from statements
    @async_test
    @asyncio.coroutine
    def test_tcp2_success(self):
        test = yield from asyncio.open_connection('www.baidu.com', 80)
        print('test_tcp2_success')
        
        
    # asyncio test with await keywords
    @async_test
    async def test_tcp3_success(self):
        test = await asyncio.open_connection('www.baidu.com', 80)
        print('test_tcp3_success')