"This event loop is already running" 在我自己的应用中集成aiohttp示例时出现的其他问题
"This event loop is already running" and other problems when integrating aiohttp examples in my own application
看了很多aiohttp相关的博客和例子。我想我理解它们,但是当我尝试将这些“知识”集成到我自己的应用程序的结构中时,出现了一些问题。下面是表示此结构的最小(非)工作示例。
我假设我对这样一个程序的结构应该是什么样子有一个基本的误解。
主要问题是RuntimeError: This event loop is already running
。我有点了解它的根源,但不知道如何解决它。
次要有两个警告。
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
这是 MWE
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp
class FetchAsync:
def __init__(self):
pass
def _get_loop(self):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
finally:
loop.set_debug(True)
return loop
async def _receive_via_aiohttp(self, session, url, headers):
async with session.get(url, headers=headers) as response:
content = await response.read()
return response, content
async def _fetch(self,
url,
session):
headers = {'User-Agent': 'MyAgent'}
# use aiohttp to get feed/xml content and response object
response, content = await self._receive_via_aiohttp(session,
url,
headers)
# do a lot more stuff...
def run(self):
loop = self._get_loop()
asyncio.run(self._run_async())
loop.close()
async def _run_async(self):
async with aiohttp.ClientSession() as session:
# in real there are much more URLs
urls = ['https://cnn.com',
'https://fsfe.org']
# create the "jobs" (futures)
futures = [self._fetch(url, session)
for url
in urls]
# run the "jobs" asynchrone
self._get_loop().run_until_complete(asyncio.wait(futures))
if __name__ == '__main__':
obj = FetchAsync()
obj.run()
这是完整的错误输出
Traceback (most recent call last):
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 62, in <module>
obj.run()
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 43, in run
asyncio.run(self._run_async())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 58, in _run_async
self._get_loop().run_until_complete(asyncio.wait(futures))
File "/usr/lib/python3.9/asyncio/base_events.py", line 618, in run_until_complete
self._check_running()
File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
如果您检查过 (最后一条评论),您会看到
Do not use loop.run_until_complete call inside async function. The purpose for that method is to run an async function inside sync context
我已经更新了你的代码。重要的修改是在您的 _run_async()
中完成的,我将 run_until_complete()
替换为 asyncio.gather()
。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp
class FetchAsync:
def __init__(self):
pass
def _get_loop(self):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
finally:
loop.set_debug(True)
return loop
async def _receive_via_aiohttp(self, session, url, headers):
async with session.get(url, headers=headers) as response:
content = await response.read()
return response, content
async def _fetch(self,
url,
session):
headers = {'User-Agent': 'MyAgent'}
# use aiohttp to get feed/xml content and response object
response, content = await self._receive_via_aiohttp(session,
url,
headers)
return response, content
# do a lot more stuff...
def run(self):
loop = self._get_loop()
loop.run_until_complete(self._run_async())
loop.close()
async def _run_async(self):
async with aiohttp.ClientSession() as session:
# in real there are much more URLs
urls = ['https://cnn.com',
'https://fsfe.org']
# create the "jobs" (futures)
futures = [self._fetch(url, session)
for url
in urls]
print(await asyncio.gather(*futures))
# run the "jobs" asynchrone
if __name__ == '__main__':
obj = FetchAsync()
obj.run()
看了很多aiohttp相关的博客和例子。我想我理解它们,但是当我尝试将这些“知识”集成到我自己的应用程序的结构中时,出现了一些问题。下面是表示此结构的最小(非)工作示例。
我假设我对这样一个程序的结构应该是什么样子有一个基本的误解。
主要问题是RuntimeError: This event loop is already running
。我有点了解它的根源,但不知道如何解决它。
次要有两个警告。
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
这是 MWE
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp
class FetchAsync:
def __init__(self):
pass
def _get_loop(self):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
finally:
loop.set_debug(True)
return loop
async def _receive_via_aiohttp(self, session, url, headers):
async with session.get(url, headers=headers) as response:
content = await response.read()
return response, content
async def _fetch(self,
url,
session):
headers = {'User-Agent': 'MyAgent'}
# use aiohttp to get feed/xml content and response object
response, content = await self._receive_via_aiohttp(session,
url,
headers)
# do a lot more stuff...
def run(self):
loop = self._get_loop()
asyncio.run(self._run_async())
loop.close()
async def _run_async(self):
async with aiohttp.ClientSession() as session:
# in real there are much more URLs
urls = ['https://cnn.com',
'https://fsfe.org']
# create the "jobs" (futures)
futures = [self._fetch(url, session)
for url
in urls]
# run the "jobs" asynchrone
self._get_loop().run_until_complete(asyncio.wait(futures))
if __name__ == '__main__':
obj = FetchAsync()
obj.run()
这是完整的错误输出
Traceback (most recent call last):
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 62, in <module>
obj.run()
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 43, in run
asyncio.run(self._run_async())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 58, in _run_async
self._get_loop().run_until_complete(asyncio.wait(futures))
File "/usr/lib/python3.9/asyncio/base_events.py", line 618, in run_until_complete
self._check_running()
File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
如果您检查过
Do not use loop.run_until_complete call inside async function. The purpose for that method is to run an async function inside sync context
我已经更新了你的代码。重要的修改是在您的 _run_async()
中完成的,我将 run_until_complete()
替换为 asyncio.gather()
。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp
class FetchAsync:
def __init__(self):
pass
def _get_loop(self):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
finally:
loop.set_debug(True)
return loop
async def _receive_via_aiohttp(self, session, url, headers):
async with session.get(url, headers=headers) as response:
content = await response.read()
return response, content
async def _fetch(self,
url,
session):
headers = {'User-Agent': 'MyAgent'}
# use aiohttp to get feed/xml content and response object
response, content = await self._receive_via_aiohttp(session,
url,
headers)
return response, content
# do a lot more stuff...
def run(self):
loop = self._get_loop()
loop.run_until_complete(self._run_async())
loop.close()
async def _run_async(self):
async with aiohttp.ClientSession() as session:
# in real there are much more URLs
urls = ['https://cnn.com',
'https://fsfe.org']
# create the "jobs" (futures)
futures = [self._fetch(url, session)
for url
in urls]
print(await asyncio.gather(*futures))
# run the "jobs" asynchrone
if __name__ == '__main__':
obj = FetchAsync()
obj.run()