当异常计数超过工作人员计数时,如何使用 return_exceptions=True 获取 httpx.gather() 以完成任务队列?

How to get httpx.gather() with return_exceptions=True to complete the Queue of tasks when the exception count exceeds the worker count?

我第一次将 asyncio 与 httpx.AsyncClient 一起使用,并试图弄清楚当其中一些任务可能失败时如何完成我的任务列表。我正在使用我在几个地方找到的模式,我用协程函数填充异步队列,并有一组工作进程从 asyncio.gather 内部排队。通常,如果执行工作的函数引发异常,您会看到整个脚本在该处理过程中失败,并报告异常以及 RuntimeWarning: coroutine foo was never awaited,表明您从未完成列表。

我找到了 asyncio.gather 的 return_exceptions 选项,这有帮助,但并不完全。在我收到异常的次数与我调用 gather 的工作人员总数相同的次数后,我的脚本仍然会死掉。以下是演示问题的简单脚本。

from httpx import AsyncClient, Timeout
from asyncio import run, gather, Queue as asyncio_Queue
from random import choice


async def process_url(client, url):
    """
    opens the URL and pulls a header attribute
    randomly raises an exception to demonstrate my problem
    """
    if choice([True, False]):
        await client.get(url)
        print(f'retrieved url {url}')
    else:
        raise AssertionError(f'generated error for url {url}')


async def main(worker_count, urls):
    """
    orchestrates the workers that call process_url
    """
    httpx_timeout = Timeout(10.0, read=20.0)
    async with AsyncClient(timeout=httpx_timeout, follow_redirects=True) as client:
        tasks = asyncio_Queue(maxsize=0)
        for url in urls:
            await tasks.put(process_url(client, url))

        async def worker():
            while not tasks.empty():
                await tasks.get_nowait()

        results = await gather(*[worker() for _ in range(worker_count)], return_exceptions=True)
        return results

if __name__ == '__main__':
    urls = ['https://whosebug.com/questions',
            'https://whosebug.com/jobs',
            'https://whosebug.com/tags',
            'https://whosebug.com/users',
            'https://www.google.com/',
            'https://www.bing.com/',
            'https://www.yahoo.com/',
            'https://www.foxnews.com/',
            'https://www.cnn.com/',
            'https://www.npr.org/',
            'https://www.opera.com/',
            'https://www.mozilla.org/en-US/firefox/',
            'https://www.google.com/chrome/',
            'https://www.epicbrowser.com/'
            ]
    print(f'processing {len(urls)} urls')
    run_results = run(main(4, urls))
    print('\n'.join([str(rr) for rr in run_results]))

此脚本的一个 运行 输出:

processing 14 urls
retrieved url https://whosebug.com/tags
retrieved url https://whosebug.com/jobs
retrieved url https://whosebug.com/users
retrieved url https://www.bing.com/
generated error for url https://whosebug.com/questions
generated error for url https://www.foxnews.com/
generated error for url https://www.google.com/
generated error for url https://www.yahoo.com/
sys:1: RuntimeWarning: coroutine 'process_url' was never awaited

Process finished with exit code 0

在这里你看到我们通过了总共 14 个 url 中的 8 个,但是当我们达到 4 个错误时,脚本结束并忽略了其余的 url。

我想要做的是让脚本完成完整的 url 集,但在最后通知我错误。有没有办法在这里做到这一点?可能是我必须将 process_url() 中的所有内容包装在 try/except 块中,并使用类似 aiofile 的东西在最后将它们转储出来?

更新 需要明确的是,这个演示脚本是对我实际工作的简化。我的真实脚本对少数服务器 api 端点进行了数十万次访问。使用 worker 集合的目的是避免使我正在访问的服务器不堪重负[这是一个测试服务器,而不是生产服务器,因此它不打算处理大量请求,尽管数量大于 4 8-)]。我愿意学习替代方案。

您概述的程序设计应该可以正常工作,但您必须防止任务(worker 函数的实例)崩溃。下面的清单显示了一种方法。

您的队列名为“任务”,但您放入其中的项目不是任务 - 它们是 协程。就目前而言,您的程序有五个任务:其中之一是 main 函数,它由 asyncio.run() 变成一个任务。其他四个任务是worker的实例,由asyncio.gather.

做成任务

worker 在协程上等待并且该协程崩溃时,异常会在 await 语句中传播到 worker 中。因为没有处理异常,worker会依次崩溃。为防止这种情况发生,请执行以下操作:

async def worker():
    while not tasks.empty():
        try:
            await tasks.get_nowait()
        except Exception:
            pass
            # You might want to do something more intelligent here
            # (logging, perhaps), rather than simply suppressing the exception

这应该允许您的示例程序 运行 完成。