How to retry async requests upon ClientOSError: [Errno 104] Connection reset by peer?
How to retry async requests upon ClientOSError: [Errno 104] Connection reset by peer?
我在 Google Cloud 中有一个函数可以接受多个参数。我使用 aiohttp:
生成具有不同参数值组合的 ~2k 异步请求
# url = 'https://...'
# headers = {'X-Header': 'value'}
timeout = aiohttp.ClientTimeout(total=72000000)
async def submit_bt(session, url, payload):
async with session.post(url, json=payload) as resp:
result = await resp.text()
async def main():
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
tasks = []
gen = payload_generator() # a class that generates dictionaries
for payload in gen.param_grid():
tasks.append(asyncio.ensure_future(submit_bt(session, url, payload)))
bt_results = await asyncio.gather(*tasks)
for result in bt_results:
pass
asyncio.run(main())
一个函数需要 3 到 6 分钟才能完成 运行,函数超时设置为 9 分钟,最大实例数设置为 3000,但我从未见过超过 150-200 个实例被启动,即使提交的请求总数在 1.5k 到 2.5k 之间。在某些情况下,所有请求都会在 20 到 30 分钟内得到处理,但有时我会在客户端收到错误消息:
ClientOSError: [Errno 104] Connection reset by peer
这与服务器端的任何错误都不对应。我想我应该能够将其作为 aiohttp.client_exceptions.ClientOSError
异常捕获,但我不确定如何在异步设置中处理它,以便重新提交失败的请求并避免过早终止。非常感谢任何提示。
您可以 这是一个类似的问题。基于此线程的回答:
云函数是无状态的,但可以重新使用之前调用的全局状态。 tips and these docs.
中对此进行了解释
使用带重试的全局状态应该会给你一个更强大的功能:
您可以导入以下库,并可以在@retry 方法中使用您的云函数。
from tenacity import retry, stop_after_attempt, wait_random
@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2))
def function():
@vaizki 在评论中建议的解决方案对我来说似乎很有效。仔细查看回溯后发现异常是在 submit_bt
协程中引发的,所以我添加了 try-except 子句:
async def submit_bt(session, url, payload):
try:
async with session.post(url, json=payload) as resp:
result = await resp.text()
except aiohttp.client_exceptions.ClientOSError as e:
await asyncio.sleep(3 + random.randint(0, 9))
async with session.post(url, json=payload) as resp:
result = await resp.text()
except Exception as e:
result = str(e)
return result
重复的行看起来不是很优雅,但这对我来说仍在进行中,代码结构在这个阶段还没有正式化。反正我想达到的目的一目了然:
- post 函数的有效载荷 URL,
- 捕获异常,但仅在
ClientOSError
的情况下重复 post,并且仅重复一次。
我不想使用 while True
那种循环来避免在出现某些严重问题时无限执行。我按原样尝试了这段代码几次,我知道它通过几次连接重置工作,直到任务列表结束,因为我得到了函数生成的所有结果,所以即使在这种形式下它也足够健壮我的情况。
我在 Google Cloud 中有一个函数可以接受多个参数。我使用 aiohttp:
生成具有不同参数值组合的 ~2k 异步请求# url = 'https://...'
# headers = {'X-Header': 'value'}
timeout = aiohttp.ClientTimeout(total=72000000)
async def submit_bt(session, url, payload):
async with session.post(url, json=payload) as resp:
result = await resp.text()
async def main():
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
tasks = []
gen = payload_generator() # a class that generates dictionaries
for payload in gen.param_grid():
tasks.append(asyncio.ensure_future(submit_bt(session, url, payload)))
bt_results = await asyncio.gather(*tasks)
for result in bt_results:
pass
asyncio.run(main())
一个函数需要 3 到 6 分钟才能完成 运行,函数超时设置为 9 分钟,最大实例数设置为 3000,但我从未见过超过 150-200 个实例被启动,即使提交的请求总数在 1.5k 到 2.5k 之间。在某些情况下,所有请求都会在 20 到 30 分钟内得到处理,但有时我会在客户端收到错误消息:
ClientOSError: [Errno 104] Connection reset by peer
这与服务器端的任何错误都不对应。我想我应该能够将其作为 aiohttp.client_exceptions.ClientOSError
异常捕获,但我不确定如何在异步设置中处理它,以便重新提交失败的请求并避免过早终止。非常感谢任何提示。
您可以
云函数是无状态的,但可以重新使用之前调用的全局状态。 tips and these docs.
中对此进行了解释使用带重试的全局状态应该会给你一个更强大的功能:
您可以导入以下库,并可以在@retry 方法中使用您的云函数。
from tenacity import retry, stop_after_attempt, wait_random
@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2))
def function():
@vaizki 在评论中建议的解决方案对我来说似乎很有效。仔细查看回溯后发现异常是在 submit_bt
协程中引发的,所以我添加了 try-except 子句:
async def submit_bt(session, url, payload):
try:
async with session.post(url, json=payload) as resp:
result = await resp.text()
except aiohttp.client_exceptions.ClientOSError as e:
await asyncio.sleep(3 + random.randint(0, 9))
async with session.post(url, json=payload) as resp:
result = await resp.text()
except Exception as e:
result = str(e)
return result
重复的行看起来不是很优雅,但这对我来说仍在进行中,代码结构在这个阶段还没有正式化。反正我想达到的目的一目了然:
- post 函数的有效载荷 URL,
- 捕获异常,但仅在
ClientOSError
的情况下重复 post,并且仅重复一次。
我不想使用 while True
那种循环来避免在出现某些严重问题时无限执行。我按原样尝试了这段代码几次,我知道它通过几次连接重置工作,直到任务列表结束,因为我得到了函数生成的所有结果,所以即使在这种形式下它也足够健壮我的情况。