Python asyncio - 如何等待并行重试

Python asyncio - how to await for retries in parallel

我正在并行执行一堆异步调用,如下所示:

txs = await asyncio.gather(*[fetch_tx_details(s["signature"]) for s in sigs])

这些调用有时会失败,所以我用这样的退避来装饰每个调用:

@retry_with_backoff(10)
async def fetch_tx_details(sig):
    # stuff

退避的定义如下:

def retry_with_backoff(retries=5, backoff_in_ms=100):
    def wrapper(f):
        @functools.wraps(f)
        async def wrapped(*args, **kwargs):
            x = 0
            while True:
                try:
                    return await f(*args, **kwargs)
                except Exception as e:
                    print('Fetch error:', e)

                    if x == retries:
                        raise
                    else:
                        sleep_ms = (backoff_in_ms * 2 ** x +
                                    random.uniform(0, 1))
                        time.sleep(sleep_ms / 1000)
                        x += 1
                        print(f'Retrying {x + 1}/{retries}')

        return wrapped

    return wrapper

我遇到的问题是,如果任何调用失败,它们将按顺序而不是并行重试。例如,我正在尝试 1000 次调用,100 次失败 - 我现在执行了 100 次顺序调用(慢),而不是执行 100 次并行调用(快)。

如何更改我的代码以并行重试?

不要使用time.sleep()。这将完全阻止执行,包括其他协程。在 asyncio 任务中始终使用 asyncio.sleep() coroutine,因为这会执行其他未被阻止的任务:

sleep() always suspends the current task, allowing other tasks to run.

你已经在使用一个循环,如果你切换到await asyncio.sleep(...)然后任务将暂停并让其他人运行直到重试等待时间结束:

def retry_with_backoff(retries=5, backoff_in_ms=100):
    def wrapper(f):
        @functools.wraps(f)
        async def wrapped(*args, **kwargs):
            x = 0
            while True:
                try:
                    return await f(*args, **kwargs)
                except Exception as e:
                    print('Fetch error:', e)

                    if x == retries:
                        raise
                    else:
                        sleep_ms = (backoff_in_ms * 2 ** x +
                                    random.uniform(0, 1))
                        await asyncio.sleep(sleep_ms / 1000)
                        x += 1
                        print(f'Retrying {x + 1}/{retries}')

        return wrapped

    return wrapper

任何地方await用于协程控制都可以传回事件循环来切换任务。连接的 await 调用是从事件循环到异步任务当前执行点的线程,以及协程可以 co-operate 允许其他线程的方式任务做工作每次都有理由等待某事。就像等待足够的时间过去。

time.sleep(),另一方面,等待而不将控制权移交给其他任务。相反,整个线程 被阻塞而无法执行任何操作,这意味着 asyncio 事件循环被阻塞,因此其他任务也无法执行。