asyncio.wait 任务异常后重试任务
Retry task after task exception with asyncio.wait
我有多个协同程序应该同时运行,其中一些可能会抛出异常。在这些情况下,协程应该再次 运行 。我该如何做到这一点?我正在尝试做的最小演示:
import asyncio
import time
t = time.time()
async def c1():
print("finished c1 {}".format(time.time() - t))
async def c2():
await asyncio.sleep(3)
print("finished c2 {}".format(time.time() - t))
called = False
async def c3():
global called
# raises an exception the first time it's called
if not called:
called = True
raise RuntimeError("c3 called the first time")
print("finished c3 {}".format(time.time() - t))
async def run():
pending = {c1(), c2(), c3()}
num_times_called = 0
while pending:
num_times_called += 1
print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))
finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
for task in finished:
if task.exception():
print("{} got an exception {}, retrying".format(task, task.exception()))
pending.add(task)
print("finished {}".format(finished))
print("finished all {}".format(time.time() - t))
asyncio.get_event_loop().run_until_complete(run())
c3()
表示部分协程会失败,需要重新运行。演示的问题是已完成的任务已完成并设置了异常,因此当我将其放回待定集时,下一个 运行 循环会立即退出而不会重新 运行ning c3()
因为它已经完成了。
有没有办法清除任务,使其再次运行 c3()
?我知道不能再次等待附加到任务的协程实例,否则我得到
RuntimeError('cannot reuse already awaited coroutine',)
这意味着我必须手动管理从协程实例到生成它的协程的映射,然后使用 task._coro
检索失败的协程实例 - 这样对吗?
编辑:任务本身可以是地图中的关键,这样更干净。
async def run():
tasks = {asyncio.ensure_future(c()): c for c in (c1, c2, c3)}
pending = set(tasks.keys())
num_times_called = 0
while pending:
num_times_called += 1
print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))
finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
for task in finished:
if task.exception():
print("{} got an exception {}, retrying".format(task, task.exception()))
coro = tasks[task]
new_task = asyncio.ensure_future(coro())
tasks[new_task] = coro
pending.add(new_task)
print("finished {}".format(finished))
print("finished all {}".format(time.time() - t))
我有多个协同程序应该同时运行,其中一些可能会抛出异常。在这些情况下,协程应该再次 运行 。我该如何做到这一点?我正在尝试做的最小演示:
import asyncio
import time
t = time.time()
async def c1():
print("finished c1 {}".format(time.time() - t))
async def c2():
await asyncio.sleep(3)
print("finished c2 {}".format(time.time() - t))
called = False
async def c3():
global called
# raises an exception the first time it's called
if not called:
called = True
raise RuntimeError("c3 called the first time")
print("finished c3 {}".format(time.time() - t))
async def run():
pending = {c1(), c2(), c3()}
num_times_called = 0
while pending:
num_times_called += 1
print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))
finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
for task in finished:
if task.exception():
print("{} got an exception {}, retrying".format(task, task.exception()))
pending.add(task)
print("finished {}".format(finished))
print("finished all {}".format(time.time() - t))
asyncio.get_event_loop().run_until_complete(run())
c3()
表示部分协程会失败,需要重新运行。演示的问题是已完成的任务已完成并设置了异常,因此当我将其放回待定集时,下一个 运行 循环会立即退出而不会重新 运行ning c3()
因为它已经完成了。
有没有办法清除任务,使其再次运行 c3()
?我知道不能再次等待附加到任务的协程实例,否则我得到
RuntimeError('cannot reuse already awaited coroutine',)
这意味着我必须手动管理从协程实例到生成它的协程的映射,然后使用 task._coro
检索失败的协程实例 - 这样对吗?
编辑:任务本身可以是地图中的关键,这样更干净。
async def run():
tasks = {asyncio.ensure_future(c()): c for c in (c1, c2, c3)}
pending = set(tasks.keys())
num_times_called = 0
while pending:
num_times_called += 1
print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))
finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
for task in finished:
if task.exception():
print("{} got an exception {}, retrying".format(task, task.exception()))
coro = tasks[task]
new_task = asyncio.ensure_future(coro())
tasks[new_task] = coro
pending.add(new_task)
print("finished {}".format(finished))
print("finished all {}".format(time.time() - t))