包装 asyncio.gather 时出现异常

Exception when wrapping asyncio.gather

我用 asyncio.gather 进行了如下实验:

async def some_work(work_name, timeout, raise_exception=False):
    """Do some work"""
    print(f"Start {work_name}")
    await asyncio.sleep(timeout)
    if raise_exception:
        raise RuntimeError(f"{work_name} raise an exception")
    print(f"Finish {work_name}")


async def main():
    try:
        await asyncio.gather(
            some_work("work1", 3),
            some_work("work2", 1),
            some_work("work3", 2),
            asyncio.gather(
                some_work("work4", 3),
                some_work("work5", 1, raise_exception=True),
                some_work("work6", 2)
            )
        )

    except RuntimeError as error:
        print(error)


if __name__ == '__main__':
    asyncio.run(main())

在某些时候,我决定像这样对 asyncio.gather 进行包装:

# Yes I know, concurrently really
def in_parallel(*aws, loop=None, return_exceptions=False):
    return asyncio.gather(aws, loop, return_exceptions)

并像这样使用它:

async def main():
    try:
        await in_parallel(
            some_work("work1", 3),
            some_work("work2", 1),
            some_work("work3", 2),
            in_parallel(
                some_work("work4", 3),
                some_work("work5", 1, raise_exception=True),
                some_work("work6", 2)
            )
        )

    except RuntimeError as error:
        print(error)


if __name__ == '__main__':
    asyncio.run(main())

出现了一堆错误:

D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py:34: RuntimeWarning: coroutine 'some_work' was never awaited in_parallel( RuntimeWarning: Enable tracemalloc to get the object allocation traceback Traceback (most recent call last): File "D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py", line 46, in asyncio.run(main()) File "C:\Program Files\Python38\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 612, in run_until_complete return future.result() File "D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py", line 34, in main in_parallel( File "D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py", line 14, in in_parallel return asyncio.gather(aws, loop, return_exceptions) File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 806, in gather fut = ensure_future(arg, loop=loop) File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 673, in ensure_future raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' TypeError: An asyncio.Future, a coroutine or an awaitable is required sys:1: RuntimeWarning: coroutine 'some_work' was never awaited

谁能解释一下为什么?这只是一个包装纸!

Can anyone explain why? It's just a wrapper!

包装器具有正确的签名,但未正确调用 asyncio.gather:

def in_parallel(*aws, loop=None, return_exceptions=False):
    # XXX incorrect invocation of `gather`
    return asyncio.gather(aws, loop, return_exceptions)

asyncio.gather 期望可等待对象作为位置参数传递,这就是您在代码的第一个版本中调用它的方式。当你从包装器中调用它时,你总是传递给它恰好三个位置参数:aws(包含传递给 in_parallel 的等待元组),loop(总是 None当你调用它时)和 return_exceptions (布尔值)。 None 其中是一个实际的可等待对象,因此 gather 会在它尝试对收到的“可等待对象”执行某些操作时立即引发异常,例如将它们转换为期货。

in_parallel 调用 gather 的正确方法是使用 * 运算符将 aws 的每个元素作为单独的位置参数传递,并将 loopreturn_exceptions 作为关键字参数:

def in_parallel(*aws, loop=None, return_exceptions=False):
    return asyncio.gather(*aws, loop=loop, return_exceptions=return_exceptions)

通过此修改,您的代码可以按预期工作。最后,请注意显式 loop 参数是 deprecated,因此您可以从包装器中省略它。