Python 中的异步异常处理

Asynchronous exception handling in Python

我使用 asyncioaiohttp 发出异步 HTTP 请求的以下代码。

import sys
import asyncio
import aiohttp

@asyncio.coroutine
def get(url):
    try:
        print('GET %s' % url)
        resp = yield from aiohttp.request('GET', url)
    except Exception as e:
        raise Exception("%s has error '%s'" % (url, e))
    else:
        if resp.status >= 400:
            raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))

    return (yield from resp.text())

@asyncio.coroutine
def fill_data(run):
    url = 'http://www.google.com/%s' % run['name']
    run['data'] = yield from get(url)

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    task = asyncio.wait([fill_data(r) for r in runs])
    loop.run_until_complete(task)   
    return runs

try:
    get_runs()
except Exception as e:
    print(repr(e))
    sys.exit(1)

出于某种原因,未捕获 get 函数内部引发的异常:

Future/Task exception was never retrieved
Traceback (most recent call last):
  File "site-packages/asyncio/tasks.py", line 236, in _step
    result = coro.send(value)
  File "mwe.py", line 25, in fill_data
    run['data'] = yield from get(url)
  File "mwe.py", line 17, in get
    raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
Exception: http://www.google.com/two has error '404: Not Found'

那么,处理协程引发的异常的正确方法是什么?

asyncio.wait 实际上并不消耗传递给它的 Futures,它只是等待它们完成,然后 returns Future 对象:

coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

在您实际 yield from/await done 列表中的项目之前,它们将保持未消耗状态。由于您的程序在没有消耗期货的情况下退出,您会看到“从未检索到异常”消息。

对于您的用例,使用 asyncio.gather 可能更有意义,它实际上会消耗每个 Future,然后 return 一个 Future汇总所有结果(或引发输入列表中未来抛出的第一个 Exception)。

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(*[fill_data(r) for r in runs])
    loop.run_until_complete(tasks)
    return runs

输出:

GET http://www.google.com/two
GET http://www.google.com/one
Exception("http://www.google.com/one has error '404: Not Found'",)

请注意,asyncio.gather 实际上允许您在其中一个期货引发异常时自定义其行为;默认行为是引发它遇到的第一个异常,但它也可以 return 输出列表中的每个异常对象:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

要调试或 "handle" callback 中的异常:

return 一些结果或引发异常的协程:

@asyncio.coroutine
def async_something_entry_point(self):
    try:
        return self.real_stuff_which_throw_exceptions()
    except:
        raise Exception(some_identifier_here + ' ' + traceback.format_exc())

和回调:

def callback(self, future: asyncio.Future):
    exc = future.exception()
    if exc:
        # Handle wonderful empty TimeoutError exception
        if type(exc) == TimeoutError:
            self.logger('<Some id here> callback exception TimeoutError')
        else:
            self.logger("<Some id here> callback exception " + str(exc))

    # store your result where you want
    self.result.append(
        future.result()
    )