当异常计数超过工作人员计数时,如何使用 return_exceptions=True 获取 httpx.gather() 以完成任务队列?
How to get httpx.gather() with return_exceptions=True to complete the Queue of tasks when the exception count exceeds the worker count?
我第一次将 asyncio 与 httpx.AsyncClient 一起使用,并试图弄清楚当其中一些任务可能失败时如何完成我的任务列表。我正在使用我在几个地方找到的模式,我用协程函数填充异步队列,并有一组工作进程从 asyncio.gather 内部排队。通常,如果执行工作的函数引发异常,您会看到整个脚本在该处理过程中失败,并报告异常以及 RuntimeWarning: coroutine foo was never awaited
,表明您从未完成列表。
我找到了 asyncio.gather 的 return_exceptions
选项,这有帮助,但并不完全。在我收到异常的次数与我调用 gather
的工作人员总数相同的次数后,我的脚本仍然会死掉。以下是演示问题的简单脚本。
from httpx import AsyncClient, Timeout
from asyncio import run, gather, Queue as asyncio_Queue
from random import choice
async def process_url(client, url):
"""
opens the URL and pulls a header attribute
randomly raises an exception to demonstrate my problem
"""
if choice([True, False]):
await client.get(url)
print(f'retrieved url {url}')
else:
raise AssertionError(f'generated error for url {url}')
async def main(worker_count, urls):
"""
orchestrates the workers that call process_url
"""
httpx_timeout = Timeout(10.0, read=20.0)
async with AsyncClient(timeout=httpx_timeout, follow_redirects=True) as client:
tasks = asyncio_Queue(maxsize=0)
for url in urls:
await tasks.put(process_url(client, url))
async def worker():
while not tasks.empty():
await tasks.get_nowait()
results = await gather(*[worker() for _ in range(worker_count)], return_exceptions=True)
return results
if __name__ == '__main__':
urls = ['https://whosebug.com/questions',
'https://whosebug.com/jobs',
'https://whosebug.com/tags',
'https://whosebug.com/users',
'https://www.google.com/',
'https://www.bing.com/',
'https://www.yahoo.com/',
'https://www.foxnews.com/',
'https://www.cnn.com/',
'https://www.npr.org/',
'https://www.opera.com/',
'https://www.mozilla.org/en-US/firefox/',
'https://www.google.com/chrome/',
'https://www.epicbrowser.com/'
]
print(f'processing {len(urls)} urls')
run_results = run(main(4, urls))
print('\n'.join([str(rr) for rr in run_results]))
此脚本的一个 运行 输出:
processing 14 urls
retrieved url https://whosebug.com/tags
retrieved url https://whosebug.com/jobs
retrieved url https://whosebug.com/users
retrieved url https://www.bing.com/
generated error for url https://whosebug.com/questions
generated error for url https://www.foxnews.com/
generated error for url https://www.google.com/
generated error for url https://www.yahoo.com/
sys:1: RuntimeWarning: coroutine 'process_url' was never awaited
Process finished with exit code 0
在这里你看到我们通过了总共 14 个 url 中的 8 个,但是当我们达到 4 个错误时,脚本结束并忽略了其余的 url。
我想要做的是让脚本完成完整的 url 集,但在最后通知我错误。有没有办法在这里做到这一点?可能是我必须将 process_url()
中的所有内容包装在 try/except
块中,并使用类似 aiofile 的东西在最后将它们转储出来?
更新
需要明确的是,这个演示脚本是对我实际工作的简化。我的真实脚本对少数服务器 api 端点进行了数十万次访问。使用 worker 集合的目的是避免使我正在访问的服务器不堪重负[这是一个测试服务器,而不是生产服务器,因此它不打算处理大量请求,尽管数量大于 4 8-)]。我愿意学习替代方案。
您概述的程序设计应该可以正常工作,但您必须防止任务(worker
函数的实例)崩溃。下面的清单显示了一种方法。
您的队列名为“任务”,但您放入其中的项目不是任务 - 它们是 协程。就目前而言,您的程序有五个任务:其中之一是 main
函数,它由 asyncio.run() 变成一个任务。其他四个任务是worker
的实例,由asyncio.gather.
做成任务
当 worker
在协程上等待并且该协程崩溃时,异常会在 await 语句中传播到 worker
中。因为没有处理异常,worker
会依次崩溃。为防止这种情况发生,请执行以下操作:
async def worker():
while not tasks.empty():
try:
await tasks.get_nowait()
except Exception:
pass
# You might want to do something more intelligent here
# (logging, perhaps), rather than simply suppressing the exception
这应该允许您的示例程序 运行 完成。
我第一次将 asyncio 与 httpx.AsyncClient 一起使用,并试图弄清楚当其中一些任务可能失败时如何完成我的任务列表。我正在使用我在几个地方找到的模式,我用协程函数填充异步队列,并有一组工作进程从 asyncio.gather 内部排队。通常,如果执行工作的函数引发异常,您会看到整个脚本在该处理过程中失败,并报告异常以及 RuntimeWarning: coroutine foo was never awaited
,表明您从未完成列表。
我找到了 asyncio.gather 的 return_exceptions
选项,这有帮助,但并不完全。在我收到异常的次数与我调用 gather
的工作人员总数相同的次数后,我的脚本仍然会死掉。以下是演示问题的简单脚本。
from httpx import AsyncClient, Timeout
from asyncio import run, gather, Queue as asyncio_Queue
from random import choice
async def process_url(client, url):
"""
opens the URL and pulls a header attribute
randomly raises an exception to demonstrate my problem
"""
if choice([True, False]):
await client.get(url)
print(f'retrieved url {url}')
else:
raise AssertionError(f'generated error for url {url}')
async def main(worker_count, urls):
"""
orchestrates the workers that call process_url
"""
httpx_timeout = Timeout(10.0, read=20.0)
async with AsyncClient(timeout=httpx_timeout, follow_redirects=True) as client:
tasks = asyncio_Queue(maxsize=0)
for url in urls:
await tasks.put(process_url(client, url))
async def worker():
while not tasks.empty():
await tasks.get_nowait()
results = await gather(*[worker() for _ in range(worker_count)], return_exceptions=True)
return results
if __name__ == '__main__':
urls = ['https://whosebug.com/questions',
'https://whosebug.com/jobs',
'https://whosebug.com/tags',
'https://whosebug.com/users',
'https://www.google.com/',
'https://www.bing.com/',
'https://www.yahoo.com/',
'https://www.foxnews.com/',
'https://www.cnn.com/',
'https://www.npr.org/',
'https://www.opera.com/',
'https://www.mozilla.org/en-US/firefox/',
'https://www.google.com/chrome/',
'https://www.epicbrowser.com/'
]
print(f'processing {len(urls)} urls')
run_results = run(main(4, urls))
print('\n'.join([str(rr) for rr in run_results]))
此脚本的一个 运行 输出:
processing 14 urls
retrieved url https://whosebug.com/tags
retrieved url https://whosebug.com/jobs
retrieved url https://whosebug.com/users
retrieved url https://www.bing.com/
generated error for url https://whosebug.com/questions
generated error for url https://www.foxnews.com/
generated error for url https://www.google.com/
generated error for url https://www.yahoo.com/
sys:1: RuntimeWarning: coroutine 'process_url' was never awaited
Process finished with exit code 0
在这里你看到我们通过了总共 14 个 url 中的 8 个,但是当我们达到 4 个错误时,脚本结束并忽略了其余的 url。
我想要做的是让脚本完成完整的 url 集,但在最后通知我错误。有没有办法在这里做到这一点?可能是我必须将 process_url()
中的所有内容包装在 try/except
块中,并使用类似 aiofile 的东西在最后将它们转储出来?
更新 需要明确的是,这个演示脚本是对我实际工作的简化。我的真实脚本对少数服务器 api 端点进行了数十万次访问。使用 worker 集合的目的是避免使我正在访问的服务器不堪重负[这是一个测试服务器,而不是生产服务器,因此它不打算处理大量请求,尽管数量大于 4 8-)]。我愿意学习替代方案。
您概述的程序设计应该可以正常工作,但您必须防止任务(worker
函数的实例)崩溃。下面的清单显示了一种方法。
您的队列名为“任务”,但您放入其中的项目不是任务 - 它们是 协程。就目前而言,您的程序有五个任务:其中之一是 main
函数,它由 asyncio.run() 变成一个任务。其他四个任务是worker
的实例,由asyncio.gather.
当 worker
在协程上等待并且该协程崩溃时,异常会在 await 语句中传播到 worker
中。因为没有处理异常,worker
会依次崩溃。为防止这种情况发生,请执行以下操作:
async def worker():
while not tasks.empty():
try:
await tasks.get_nowait()
except Exception:
pass
# You might want to do something more intelligent here
# (logging, perhaps), rather than simply suppressing the exception
这应该允许您的示例程序 运行 完成。