"This event loop is already running" 在我自己的应用中集成aiohttp示例时出现的其他问题

"This event loop is already running" and other problems when integrating aiohttp examples in my own application

看了很多aiohttp相关的博客和例子。我想我理解它们,但是当我尝试将这些“知识”集成到我自己的应用程序的结构中时,出现了一些问题。下面是表示此结构的最小(非)工作示例。

我假设我对这样一个程序的结构应该是什么样子有一个基本的误解。

主要问题是RuntimeError: This event loop is already running。我有点了解它的根源,但不知道如何解决它。

次要有两个警告。

sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited

这是 MWE

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp


class FetchAsync:
    def __init__(self):
        pass

    def _get_loop(self):
        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        finally:
            loop.set_debug(True)
            return loop

    async def _receive_via_aiohttp(self, session, url, headers):
        async with session.get(url, headers=headers) as response:
            content = await response.read()
            return response, content

    async def _fetch(self,
                     url,
                     session):
        headers = {'User-Agent': 'MyAgent'}

        # use aiohttp to get feed/xml content and response object
        response, content = await self._receive_via_aiohttp(session,
                                                            url,
                                                            headers)

        # do a lot more stuff...

    def run(self):
        loop = self._get_loop()
        asyncio.run(self._run_async())
        loop.close()

    async def _run_async(self):
        async with aiohttp.ClientSession() as session:
            # in real there are much more URLs
            urls = ['https://cnn.com',
                    'https://fsfe.org']

            # create the "jobs" (futures)
            futures = [self._fetch(url, session)
                       for url
                       in urls]

            # run the "jobs" asynchrone
            self._get_loop().run_until_complete(asyncio.wait(futures))


if __name__ == '__main__':
    obj = FetchAsync()
    obj.run()

这是完整的错误输出

Traceback (most recent call last):
  File "/home/user/share/work/aiotest/./fetchfeeds.py", line 62, in <module>
    obj.run()
  File "/home/user/share/work/aiotest/./fetchfeeds.py", line 43, in run
    asyncio.run(self._run_async())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/user/share/work/aiotest/./fetchfeeds.py", line 58, in _run_async
    self._get_loop().run_until_complete(asyncio.wait(futures))
  File "/usr/lib/python3.9/asyncio/base_events.py", line 618, in run_until_complete
    self._check_running()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited

如果您检查过 (最后一条评论),您会看到

Do not use loop.run_until_complete call inside async function. The purpose for that method is to run an async function inside sync context

我已经更新了你的代码。重要的修改是在您的 _run_async() 中完成的,我将 run_until_complete() 替换为 asyncio.gather()

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp

class FetchAsync:
    def __init__(self):
        pass

def _get_loop(self):
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    finally:
        loop.set_debug(True)
        return loop

async def _receive_via_aiohttp(self, session, url, headers):
    async with session.get(url, headers=headers) as response:
        content = await response.read()
        return response, content

async def _fetch(self,
                 url,
                 session):
    headers = {'User-Agent': 'MyAgent'}

    # use aiohttp to get feed/xml content and response object
    response, content = await self._receive_via_aiohttp(session,
                                                        url,
                                                        headers)
    return response, content
    # do a lot more stuff...

def run(self):
    loop = self._get_loop()
    loop.run_until_complete(self._run_async())
    loop.close()


async def _run_async(self):
    async with aiohttp.ClientSession() as session:
        # in real there are much more URLs
        urls = ['https://cnn.com',
                'https://fsfe.org']

        # create the "jobs" (futures)
        futures = [self._fetch(url, session)
                   for url
                   in urls]
        print(await asyncio.gather(*futures))
         # run the "jobs" asynchrone


if __name__ == '__main__':
    obj = FetchAsync()
    obj.run()