asyncio自动取消子任务
asyncio automatic cancellation of subtasks
如果异步任务 task_parent 创建子任务 task_child 但是 task_parent 由于创建 task_child 后抛出的异常而被取消,task_child 也会自动取消(如果它不受 asyncio.shield 保护)?
例如下面的代码:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await asyncio.gather(t1)
r3 = await process_result(r1) # process_result throws an exception
r2, = await asyncio.gather(t2)
return await process_results(r2, r3)
如果 process_result(r1) 抛出异常,t2 是否会自动取消(并随后被垃圾收集)?
如果不使用 asyncio.gather,我直接等待任务会怎么样:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await t1
r3 = await process_result(r1) # process_result throws an exception
r2, = await t2
return await process_results(r2, r3)
如果 process_result(r1) 抛出异常,在这种情况下 t2 是否也会自动取消?
简而言之,您所有问题的答案都是“否”。
在 asyncio 中,没有父任务与子任务这样的概念,也没有任务之间的任何层次关系。只有一个“级别”——所有任务都是等价的。
在某种程度上,您可以通过在 finally 块中显式取消任务来强制依赖,例如,
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
try:
r1 = await t1
r3 = await process_result(r1) # process_result throws an exception
r2 = await t2
return await process_results(r2, r3)
finally:
t1.cancel()
t2.cancel()
但这不会取消任何恰好由 t1 和 t2 创建的任务。它只会创建一级依赖。
在最近的几个小项目中,我已经成功地使用这个 class 将任务组织成层次结构:
import asyncio
import logging
logger = logging.getLogger(__name__)
class BagContextError(Exception):
pass
class PBag:
def __init__(self):
self.futures = set()
self.exceptions = []
self.done = asyncio.Event()
self.done.set()
self._opened = False
self._closed = False
@property
def is_open(self):
return self._opened and not self._closed
def __await__(self):
yield from self.done.wait().__await__()
async def __aenter__(self):
if self._opened:
raise BagContextError("Already open")
self._opened = True
return self
async def __aexit__(self, exc_type, _exc_value, _traceback):
logger.debug("Bag exit %s %s %s", self.futures, self.exceptions,
exc_type, stack_info=True)
self._closed = True
await self.aclose()
if self.exceptions:
n = 1 if exc_type is None else 0
for ex in self.exceptions[n:]:
try:
raise ex
except Exception:
logging.exception("Suppressed exception")
if exc_type is None:
raise self.exceptions[0]
def until_done(self):
return self.done.wait()
def create_task(self, coro):
if self._closed:
raise BagContextError("Bag closed")
t = asyncio.create_task(coro)
self.add_future(t)
return t
def add_future(self, fut):
if self._closed:
raise BagContextError("Bag closed")
self.futures.add(fut)
fut.add_done_callback(self._future_done)
self.done.clear()
def close(self):
for w in self.futures:
w.cancel()
async def aclose(self):
self.close()
await self.until_done()
def _future_done(self, fut):
try:
self.futures.remove(fut)
except KeyError:
pass
if not self.futures:
self.done.set()
try:
fut.result()
except asyncio.CancelledError:
pass
except Exception as ex:
self.exceptions.append(ex)
这是一个上下文管理器。在其上下文中,任务是由 PBag.create_task 而不是 asyncio.create_task 创建的。该对象跟踪其依赖任务并在出现异常、退出上下文或调用 close() 方法时关闭它们。
如果你用它来构建整个程序,任务将按层次排列,当最外层的任务被取消时,整个结构将(某种程度上)优雅地展开。如果你只在某些地方使用它而不在其他地方使用它(即,如果你在某些地方写 asyncio.create_task),这种展开可能效果不佳。
我对此经验不多,所以当然可能存在未发现的错误。这是一个小演示程序:
async def main():
async def task1():
print("Task1 started", time.ctime())
await asyncio.sleep(2)
print("Task1 finished", time.ctime())
async def task2():
print("Task2 started", time.ctime())
await asyncio.sleep(3)
raise Exception("Task 2 error")
async def task3(bag):
bag.create_task(task2())
print("Task 3 done")
try:
async with PBag() as bag:
bag.create_task(task1())
bag.create_task(task3(bag))
await bag.until_done()
bag.create_task(task1())
await bag
except asyncio.CancelledError:
traceback.print_exc()
except Exception:
traceback.print_exc()
print("Bag closed", time.ctime())
asyncio.create_task(task1())
print("Program finished", time.ctime())
if __name__ == "__main__":
asyncio.run(main())
设法找到我自己的问题的答案。任务取消可以通过结构化并发实现,在当前版本 Python (Python 3.10) 中不支持,尽管有 proposal to introduce TaskGroups following PEP 654.
幸运的是,AnyIO 库在 asyncio 之上实现了类似三重奏的结构化并发。我问题中的示例可以在 AnyIO 中重写以具有可取消的任务:
import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack
async def coroutine1(send_stream):
async with send_stream:
await send_stream.send(1)
async def coroutine2(send_stream):
async with send_stream:
await asyncio.sleep(1)
await send_stream.send(2)
async def process_result(receive_stream, send_stream):
async with AsyncExitStack() as stack:
rs = await stack.enter_async_context(receive_stream)
ss = await stack.enter_async_context(send_stream)
res_rs = await rs.receive()
raise Exception
await ss.send(res_rs + 1)
async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
task_status.started()
async with AsyncExitStack() as stack:
rs_2 = await stack.enter_async_context(receive_stream_2)
rs_3 = await stack.enter_async_context(receive_stream_3)
res_rs_2 = await rs_2.receive()
res_rs_3 = await rs_3.receive()
return res_rs_2 + res_rs_3
async def f():
async with create_task_group() as tg:
send_stream_1, receive_stream_1 = create_memory_object_stream(1)
tg.start_soon(coroutine1, send_stream_1)
send_stream_2, receive_stream_2 = create_memory_object_stream(1)
tg.start_soon(coroutine2, send_stream_2)
send_stream_3, receive_stream_3 = create_memory_object_stream(1)
tg.start_soon(process_result, receive_stream_1, send_stream_3)
# process_result will raise an Exception which will cancel all tasks in tg group
result = await process_results(receive_stream_2, receive_stream_3)
print(result)
asyncio.run(f())
如果异步任务 task_parent 创建子任务 task_child 但是 task_parent 由于创建 task_child 后抛出的异常而被取消,task_child 也会自动取消(如果它不受 asyncio.shield 保护)?
例如下面的代码:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await asyncio.gather(t1)
r3 = await process_result(r1) # process_result throws an exception
r2, = await asyncio.gather(t2)
return await process_results(r2, r3)
如果 process_result(r1) 抛出异常,t2 是否会自动取消(并随后被垃圾收集)?
如果不使用 asyncio.gather,我直接等待任务会怎么样:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await t1
r3 = await process_result(r1) # process_result throws an exception
r2, = await t2
return await process_results(r2, r3)
如果 process_result(r1) 抛出异常,在这种情况下 t2 是否也会自动取消?
简而言之,您所有问题的答案都是“否”。
在 asyncio 中,没有父任务与子任务这样的概念,也没有任务之间的任何层次关系。只有一个“级别”——所有任务都是等价的。
在某种程度上,您可以通过在 finally 块中显式取消任务来强制依赖,例如,
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
try:
r1 = await t1
r3 = await process_result(r1) # process_result throws an exception
r2 = await t2
return await process_results(r2, r3)
finally:
t1.cancel()
t2.cancel()
但这不会取消任何恰好由 t1 和 t2 创建的任务。它只会创建一级依赖。
在最近的几个小项目中,我已经成功地使用这个 class 将任务组织成层次结构:
import asyncio
import logging
logger = logging.getLogger(__name__)
class BagContextError(Exception):
pass
class PBag:
def __init__(self):
self.futures = set()
self.exceptions = []
self.done = asyncio.Event()
self.done.set()
self._opened = False
self._closed = False
@property
def is_open(self):
return self._opened and not self._closed
def __await__(self):
yield from self.done.wait().__await__()
async def __aenter__(self):
if self._opened:
raise BagContextError("Already open")
self._opened = True
return self
async def __aexit__(self, exc_type, _exc_value, _traceback):
logger.debug("Bag exit %s %s %s", self.futures, self.exceptions,
exc_type, stack_info=True)
self._closed = True
await self.aclose()
if self.exceptions:
n = 1 if exc_type is None else 0
for ex in self.exceptions[n:]:
try:
raise ex
except Exception:
logging.exception("Suppressed exception")
if exc_type is None:
raise self.exceptions[0]
def until_done(self):
return self.done.wait()
def create_task(self, coro):
if self._closed:
raise BagContextError("Bag closed")
t = asyncio.create_task(coro)
self.add_future(t)
return t
def add_future(self, fut):
if self._closed:
raise BagContextError("Bag closed")
self.futures.add(fut)
fut.add_done_callback(self._future_done)
self.done.clear()
def close(self):
for w in self.futures:
w.cancel()
async def aclose(self):
self.close()
await self.until_done()
def _future_done(self, fut):
try:
self.futures.remove(fut)
except KeyError:
pass
if not self.futures:
self.done.set()
try:
fut.result()
except asyncio.CancelledError:
pass
except Exception as ex:
self.exceptions.append(ex)
这是一个上下文管理器。在其上下文中,任务是由 PBag.create_task 而不是 asyncio.create_task 创建的。该对象跟踪其依赖任务并在出现异常、退出上下文或调用 close() 方法时关闭它们。
如果你用它来构建整个程序,任务将按层次排列,当最外层的任务被取消时,整个结构将(某种程度上)优雅地展开。如果你只在某些地方使用它而不在其他地方使用它(即,如果你在某些地方写 asyncio.create_task),这种展开可能效果不佳。
我对此经验不多,所以当然可能存在未发现的错误。这是一个小演示程序:
async def main():
async def task1():
print("Task1 started", time.ctime())
await asyncio.sleep(2)
print("Task1 finished", time.ctime())
async def task2():
print("Task2 started", time.ctime())
await asyncio.sleep(3)
raise Exception("Task 2 error")
async def task3(bag):
bag.create_task(task2())
print("Task 3 done")
try:
async with PBag() as bag:
bag.create_task(task1())
bag.create_task(task3(bag))
await bag.until_done()
bag.create_task(task1())
await bag
except asyncio.CancelledError:
traceback.print_exc()
except Exception:
traceback.print_exc()
print("Bag closed", time.ctime())
asyncio.create_task(task1())
print("Program finished", time.ctime())
if __name__ == "__main__":
asyncio.run(main())
设法找到我自己的问题的答案。任务取消可以通过结构化并发实现,在当前版本 Python (Python 3.10) 中不支持,尽管有 proposal to introduce TaskGroups following PEP 654.
幸运的是,AnyIO 库在 asyncio 之上实现了类似三重奏的结构化并发。我问题中的示例可以在 AnyIO 中重写以具有可取消的任务:
import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack
async def coroutine1(send_stream):
async with send_stream:
await send_stream.send(1)
async def coroutine2(send_stream):
async with send_stream:
await asyncio.sleep(1)
await send_stream.send(2)
async def process_result(receive_stream, send_stream):
async with AsyncExitStack() as stack:
rs = await stack.enter_async_context(receive_stream)
ss = await stack.enter_async_context(send_stream)
res_rs = await rs.receive()
raise Exception
await ss.send(res_rs + 1)
async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
task_status.started()
async with AsyncExitStack() as stack:
rs_2 = await stack.enter_async_context(receive_stream_2)
rs_3 = await stack.enter_async_context(receive_stream_3)
res_rs_2 = await rs_2.receive()
res_rs_3 = await rs_3.receive()
return res_rs_2 + res_rs_3
async def f():
async with create_task_group() as tg:
send_stream_1, receive_stream_1 = create_memory_object_stream(1)
tg.start_soon(coroutine1, send_stream_1)
send_stream_2, receive_stream_2 = create_memory_object_stream(1)
tg.start_soon(coroutine2, send_stream_2)
send_stream_3, receive_stream_3 = create_memory_object_stream(1)
tg.start_soon(process_result, receive_stream_1, send_stream_3)
# process_result will raise an Exception which will cancel all tasks in tg group
result = await process_results(receive_stream_2, receive_stream_3)
print(result)
asyncio.run(f())