重试某些状态代码的异步 aiohttp 请求

Retry async aiohttp requests for certain status codes

使用 aiohttp 重试异步 API 调用的最佳方法是什么?我想重试标准套接字错误、超时错误等以及某些状态代码 500, 501 的请求。我已尝试使用 aiohttp_async,但无法正常工作:

import asyncio
from aiohttp import ClientSession
from aiohttp_retry import RetryClient

# Async single retry fetch
async def async_retry_fetch(url, retry_client):
    async with retry_client.get(url, retry_attempts=3, retry_for_status=[500, 501]) as response:
        try:
            data = await response.json()
        except Exception as e:
            raise Exception("Could not convert json")
    return data

async def main():
    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/500"
    ]
    api_calls = []
    async with ClientSession() as session:
        retry_client = RetryClient(session)
        for url in urls:
            api_calls.append(async_retry_fetch(url, retry_client))
    res = await asyncio.gather(*api_calls, return_exceptions=True)
    print("RESULT", res)

asyncio.run(main())

输出:

RESULT [AttributeError("'ClientSession' object has no attribute 'debug'"), AttributeError("'ClientSession' object has no attribute 'debug'")]

看起来你已经安装了aiohttp_retryversion 2.x
但是你为 version 1.2 使用了参数,这给出了 AttributeError.


旧版本可以使用

get(..., retry_attempts=3)

但新版本需要

get(..., retry_options=ExponentialRetry(attempts=3)

RetryClient(..., retry_options=ExponentialRetry(attempts=3))

但是 RetryClient 中的 attempts 有默认值 3 所以你可以跳过它。


另一个问题是RetryClient()无法获取session作为参数。
它不需要它,因为它在 __init__

中创建了自己的 ClientSession()

source code


这对我有用:

import asyncio
from aiohttp_retry import RetryClient, ExponentialRetry

class MyLogger():
    def debug(self, *args, **kwargs):
        print('[debug]:', *args, **kwargs)
    
async def async_retry_fetch(url, retry_client):

    retry_options = ExponentialRetry(attempts=3)

    #async with retry_client.get(url) as response:
    # OR
    async with retry_client.get(url, retry_options=ExponentialRetry(attempts=3), raise_for_status=[500, 501]) as response:
        try:
            data = await response.json()
        except Exception as e:
            raise Exception("Could not convert json")

    return data

async def main():
    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/500",
        "https://httpstat.us/501",
        "https://httpbin.org/status/500",
        "https://httpbin.org/status/501",
        "https://httpbin.org/json"
    ]

    #async with RetryClient(logger=MyLogger(), retry_options=ExponentialRetry(attempts=3), raise_for_status=[500, 501]) as retry_client:
    # OR
    async with RetryClient(logger=MyLogger()) as retry_client:
        api_calls = []
        
        for url in urls:
            api_calls.append(async_retry_fetch(url, retry_client))
            
        res = await asyncio.gather(*api_calls, return_exceptions=True)
 
        for item in res:
            print(item)
            print('---')

# --- start ---

asyncio.run(main())

结果:

[debug]: Attempt 0 out of 3
[debug]: Attempt 0 out of 3
[debug]: Attempt 0 out of 3
[debug]: Attempt 0 out of 3
[debug]: Attempt 0 out of 3
[debug]: Attempt 0 out of 3
[debug]: Attempt 1 out of 3
[debug]: Attempt 1 out of 3
[debug]: Attempt 1 out of 3
[debug]: Attempt 1 out of 3
[debug]: Attempt 2 out of 3
[debug]: Attempt 2 out of 3
[debug]: Attempt 2 out of 3
[debug]: Attempt 2 out of 3
Could not convert json
---
500, message='Internal Server Error', url=URL('https://httpstat.us/500')
---
501, message='Not Implemented', url=URL('https://httpstat.us/501')
---
500, message='INTERNAL SERVER ERROR', url=URL('https://httpbin.org/status/500')
---
501, message='NOT IMPLEMENTED', url=URL('https://httpbin.org/status/501')
---
{'slideshow': {'author': 'Yours Truly', 'date': 'date of publication', 'slides': [{'title': 'Wake up to WonderWidgets!', 'type': 'all'}, {'items': ['Why <em>WonderWidgets</em> are great', 'Who <em>buys</em> WonderWidgets'], 'title': 'Overview', 'type': 'all'}], 'title': 'Sample Slide Show'}}