问题
跑代码的时候直接报错:
RuntimeError: This event loop is already running
解决方案
安装库解决
pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()
问题
如何编写测试asyncio的异步函数的单元测试
解决方案
import inspect
def async_test(f):
def wrapper(*args, **kwargs):
if inspect.iscoroutinefunction(f):
future = f(*args, **kwargs)
else:
coroutine = asyncio.coroutine(f)
future = coroutine(*args, **kwargs)
asyncio.get_event_loop().run_until_complete(future)
return wrapper
class TestExample(unittest.TestCase):
def test_upper(self):
self.assertEqual('foo'.upper(), 'FOO')
# asyncio test normal function with yield from statements
@async_test
def test_tcp1_success(self):
test = yield from asyncio.open_connection('www.baidu.com', 80)
print('test_tcp1_success')
# asyncio test coroutine function with yield from statements
@async_test
@asyncio.coroutine
def test_tcp2_success(self):
test = yield from asyncio.open_connection('www.baidu.com', 80)
print('test_tcp2_success')
# asyncio test with await keywords
@async_test
async def test_tcp3_success(self):
test = await asyncio.open_connection('www.baidu.com', 80)
print('test_tcp3_success')