使用asyncio.wait FIRST_COMPLETED时如何处理异常?

How to handle exception when using asyncio.wait FIRST_COMPLETED?

使用asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)时是否有机会处理异常?

我正在尝试从多个外部资源获取我的 ip。但是当提供的网址之一被破坏时,我无法处理异常。当任何请求 returns ip.

时,循环执行应该停止
import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED


SERVICES = [
    ('ip-api1', 'http://ip-api.com/json', 'query'),
    ('broken_api', 'http://broken', 'query'),
]


async def get_ip(session, service, url, ip_attr):
    print('fetching ip from: {}'.format(service))
    async with session.get(url) as resp:
        json_resp = await resp.json()
        ip = json_resp[ip_attr]
        return service, ip


async def get_ip_from_services():
    async with aiohttp.ClientSession() as session:
        coro_obj = [get_ip(session, service[0], service[1], service[2])
                    for service in SERVICES]
        try:
            done, pending = await asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)
            result = done.pop().result()
            print(result)
            for future in pending:
                future.cancel()
        except:
            print('catched')


loop = asyncio.get_event_loop()
loop.run_until_complete(get_ip_from_services())
loop.close()

当 return_when=ALL_COMPLETED 但 FIRST_COMPLETED

时,我能够捕获异常

我遇到了以下错误:

fetching ip from: ip-api1
fetching ip from: broken_api
catched
exception calling callback for <Future at 0x103839240 state=finished returned list>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 297, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 419, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x103807948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038073d8>()]>>

或警告

fetching ip from: ip-api1
fetching ip from: broken_api
catched
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x1038e2948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038e23d8>()]>>

我认为这是 aiohttp 中的错误,我发布了它 there

您收到的异常似乎并不严重,因此您可以保持原样,直到它在 aiohttp 中得到修复。或者,如果您现在不想看到它,请不要取消请求。

与您的代码无关的注释:

1) 不要将 concurrent.futuresasyncio 一起使用:它们是不同的模块。使用 asyncio.FIRST_COMPLETED.

2) 您不仅应该取消任务,还应该等待它取消(它会引发 CancelledError)。可以使用以下代码完成:

for task in pending:
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task

详细了解 tasks/cancelling