Python asyncio - 如何等待并行重试
Python asyncio - how to await for retries in parallel
我正在并行执行一堆异步调用,如下所示:
txs = await asyncio.gather(*[fetch_tx_details(s["signature"]) for s in sigs])
这些调用有时会失败,所以我用这样的退避来装饰每个调用:
@retry_with_backoff(10)
async def fetch_tx_details(sig):
# stuff
退避的定义如下:
def retry_with_backoff(retries=5, backoff_in_ms=100):
def wrapper(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
x = 0
while True:
try:
return await f(*args, **kwargs)
except Exception as e:
print('Fetch error:', e)
if x == retries:
raise
else:
sleep_ms = (backoff_in_ms * 2 ** x +
random.uniform(0, 1))
time.sleep(sleep_ms / 1000)
x += 1
print(f'Retrying {x + 1}/{retries}')
return wrapped
return wrapper
我遇到的问题是,如果任何调用失败,它们将按顺序而不是并行重试。例如,我正在尝试 1000 次调用,100 次失败 - 我现在执行了 100 次顺序调用(慢),而不是执行 100 次并行调用(快)。
如何更改我的代码以并行重试?
不要使用time.sleep()
。这将完全阻止执行,包括其他协程。在 asyncio 任务中始终使用 asyncio.sleep()
coroutine,因为这会执行其他未被阻止的任务:
sleep()
always suspends the current task, allowing other tasks to run.
你已经在使用一个循环,如果你切换到await asyncio.sleep(...)
然后任务将暂停并让其他人运行直到重试等待时间结束:
def retry_with_backoff(retries=5, backoff_in_ms=100):
def wrapper(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
x = 0
while True:
try:
return await f(*args, **kwargs)
except Exception as e:
print('Fetch error:', e)
if x == retries:
raise
else:
sleep_ms = (backoff_in_ms * 2 ** x +
random.uniform(0, 1))
await asyncio.sleep(sleep_ms / 1000)
x += 1
print(f'Retrying {x + 1}/{retries}')
return wrapped
return wrapper
任何地方await
用于协程控制都可以传回事件循环来切换任务。连接的 await
调用是从事件循环到异步任务当前执行点的线程,以及协程可以 co-operate 允许其他线程的方式任务做工作每次都有理由等待某事。就像等待足够的时间过去。
time.sleep()
,另一方面,等待而不将控制权移交给其他任务。相反,整个线程 被阻塞而无法执行任何操作,这意味着 asyncio 事件循环被阻塞,因此其他任务也无法执行。
我正在并行执行一堆异步调用,如下所示:
txs = await asyncio.gather(*[fetch_tx_details(s["signature"]) for s in sigs])
这些调用有时会失败,所以我用这样的退避来装饰每个调用:
@retry_with_backoff(10)
async def fetch_tx_details(sig):
# stuff
退避的定义如下:
def retry_with_backoff(retries=5, backoff_in_ms=100):
def wrapper(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
x = 0
while True:
try:
return await f(*args, **kwargs)
except Exception as e:
print('Fetch error:', e)
if x == retries:
raise
else:
sleep_ms = (backoff_in_ms * 2 ** x +
random.uniform(0, 1))
time.sleep(sleep_ms / 1000)
x += 1
print(f'Retrying {x + 1}/{retries}')
return wrapped
return wrapper
我遇到的问题是,如果任何调用失败,它们将按顺序而不是并行重试。例如,我正在尝试 1000 次调用,100 次失败 - 我现在执行了 100 次顺序调用(慢),而不是执行 100 次并行调用(快)。
如何更改我的代码以并行重试?
不要使用time.sleep()
。这将完全阻止执行,包括其他协程。在 asyncio 任务中始终使用 asyncio.sleep()
coroutine,因为这会执行其他未被阻止的任务:
sleep()
always suspends the current task, allowing other tasks to run.
你已经在使用一个循环,如果你切换到await asyncio.sleep(...)
然后任务将暂停并让其他人运行直到重试等待时间结束:
def retry_with_backoff(retries=5, backoff_in_ms=100):
def wrapper(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
x = 0
while True:
try:
return await f(*args, **kwargs)
except Exception as e:
print('Fetch error:', e)
if x == retries:
raise
else:
sleep_ms = (backoff_in_ms * 2 ** x +
random.uniform(0, 1))
await asyncio.sleep(sleep_ms / 1000)
x += 1
print(f'Retrying {x + 1}/{retries}')
return wrapped
return wrapper
任何地方await
用于协程控制都可以传回事件循环来切换任务。连接的 await
调用是从事件循环到异步任务当前执行点的线程,以及协程可以 co-operate 允许其他线程的方式任务做工作每次都有理由等待某事。就像等待足够的时间过去。
time.sleep()
,另一方面,等待而不将控制权移交给其他任务。相反,整个线程 被阻塞而无法执行任何操作,这意味着 asyncio 事件循环被阻塞,因此其他任务也无法执行。