How to retry async requests upon ClientOSError: [Errno 104] Connection reset by peer?

How to retry async requests upon ClientOSError: [Errno 104] Connection reset by peer?

我在 Google Cloud 中有一个函数可以接受多个参数。我使用 aiohttp:

生成具有不同参数值组合的 ~2k 异步请求
# url = 'https://...'
# headers = {'X-Header': 'value'}

timeout = aiohttp.ClientTimeout(total=72000000)

async def submit_bt(session, url, payload):
        async with session.post(url, json=payload) as resp:
                result = await resp.text()

async def main():
        async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
                tasks = []
                gen = payload_generator() # a class that generates dictionaries
                for payload in gen.param_grid():
                        tasks.append(asyncio.ensure_future(submit_bt(session, url, payload)))

                bt_results = await asyncio.gather(*tasks)
                for result in bt_results:
                        pass

asyncio.run(main())

一个函数需要 3 到 6 分钟才能完成 运行,函数超时设置为 9 分钟,最大实例数设置为 3000,但我从未见过超过 150-200 个实例被启动,即使提交的请求总数在 1.5k 到 2.5k 之间。在某些情况下,所有请求都会在 20 到 30 分钟内得到处理,但有时我会在客户端收到错误消息:

ClientOSError: [Errno 104] Connection reset by peer

这与服务器端的任何错误都不对应。我想我应该能够将其作为 aiohttp.client_exceptions.ClientOSError 异常捕获,但我不确定如何在异步设置中处理它,以便重新提交失败的请求并避免过早终止。非常感谢任何提示。

您可以 这是一个类似的问题。基于此线程的回答:

云函数是无状态的,但可以重新使用之前调用的全局状态。 tips and these docs.

中对此进行了解释

使用带重试的全局状态应该会给你一个更强大的功能:

您可以导入以下库,并可以在@retry 方法中使用您的云函数。

from tenacity import retry, stop_after_attempt, wait_random
@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2))
def function():

@vaizki 在评论中建议的解决方案对我来说似乎很有效。仔细查看回溯后发现异常是在 submit_bt 协程中引发的,所以我添加了 try-except 子句:

async def submit_bt(session, url, payload):
        try:
                async with session.post(url, json=payload) as resp:
                        result = await resp.text()
        except aiohttp.client_exceptions.ClientOSError as e:
                await asyncio.sleep(3 + random.randint(0, 9))
                async with session.post(url, json=payload) as resp:
                        result = await resp.text()
        except Exception as e:
                result = str(e)
        return result

重复的行看起来不是很优雅,但这对我来说仍在进行中,代码结构在这个阶段还没有正式化。反正我想达到的目的一目了然:

  • post 函数的有效载荷 URL,
  • 捕获异常,但仅在 ClientOSError 的情况下重复 post,并且仅重复一次。

我不想使用 while True 那种循环来避免在出现某些严重问题时无限执行。我按原样尝试了这段代码几次,我知道它通过几次连接重置工作,直到任务列表结束,因为我得到了函数生成的所有结果,所以即使在这种形式下它也足够健壮我的情况。