asyncio自动取消子任务

asyncio automatic cancellation of subtasks

如果异步任务 task_parent 创建子任务 task_child 但是 task_parent 由于创建 task_child 后抛出的异常而被取消,task_child 也会自动取消(如果它不受 asyncio.shield 保护)?

例如下面的代码:

async def f():
    t1 = asyncio.create_task(coroutine1())
    t2 = asyncio.create_task(coroutine2())
    r1, = await asyncio.gather(t1)
    r3 = await process_result(r1) # process_result throws an exception
    r2, = await asyncio.gather(t2)
    return await process_results(r2, r3)

如果 process_result(r1) 抛出异常,t2 是否会自动取消(并随后被垃圾收集)?

如果不使用 asyncio.gather,我直接等待任务会怎么样:

async def f():
    t1 = asyncio.create_task(coroutine1())
    t2 = asyncio.create_task(coroutine2())
    r1, = await t1
    r3 = await process_result(r1) # process_result throws an exception
    r2, = await t2
    return await process_results(r2, r3)

如果 process_result(r1) 抛出异常,在这种情况下 t2 是否也会自动取消?

简而言之,您所有问题的答案都是“否”。

在 asyncio 中,没有父任务与子任务这样的概念,也没有任务之间的任何层次关系。只有一个“级别”——所有任务都是等价的。

在某种程度上,您可以通过在 finally 块中显式取消任务来强制依赖,例如,

async def f():
    t1 = asyncio.create_task(coroutine1())
    t2 = asyncio.create_task(coroutine2())
    try:
        r1 = await t1
        r3 = await process_result(r1) # process_result throws an exception
        r2 = await t2
        return await process_results(r2, r3)
    finally:
        t1.cancel()
        t2.cancel()

但这不会取消任何恰好由 t1 和 t2 创建的任务。它只会创建一级依赖。

在最近的几个小项目中,我已经成功地使用这个 class 将任务组织成层次结构:

import asyncio
import logging

logger = logging.getLogger(__name__)

class BagContextError(Exception):
    pass

class PBag:
    def __init__(self):
        self.futures = set()
        self.exceptions = []
        self.done = asyncio.Event()
        self.done.set()
        self._opened = False
        self._closed = False
        
    @property
    def is_open(self):
        return self._opened and not self._closed
    
    def __await__(self):
        yield from self.done.wait().__await__()
        
    async def __aenter__(self):
        if self._opened:
            raise BagContextError("Already open")
        self._opened = True
        return self
    
    async def __aexit__(self, exc_type, _exc_value, _traceback):
        logger.debug("Bag exit %s %s %s", self.futures, self.exceptions,
                     exc_type, stack_info=True)
        self._closed = True
        await self.aclose()
        if self.exceptions:
            n = 1 if exc_type is None else 0
            for ex in self.exceptions[n:]:
                try:
                    raise ex
                except Exception:
                    logging.exception("Suppressed exception")
            if exc_type is None:
                raise self.exceptions[0]
    
    def until_done(self):
        return self.done.wait()
        
    def create_task(self, coro):
        if self._closed:
            raise BagContextError("Bag closed")
        t = asyncio.create_task(coro)
        self.add_future(t)
        return t
        
    def add_future(self, fut):
        if self._closed:
            raise BagContextError("Bag closed")
        self.futures.add(fut)
        fut.add_done_callback(self._future_done)
        self.done.clear()
        
    def close(self):
        for w in self.futures:
            w.cancel()
            
    async def aclose(self):
        self.close()
        await self.until_done()
        
    def _future_done(self, fut):
        try:
            self.futures.remove(fut)
        except KeyError:
            pass
        if not self.futures:
            self.done.set()
        try:
            fut.result()
        except asyncio.CancelledError:
            pass
        except Exception as ex:
            self.exceptions.append(ex)

这是一个上下文管理器。在其上下文中,任务是由 PBag.create_task 而不是 asyncio.create_task 创建的。该对象跟踪其依赖任务并在出现异常、退出上下文或调用 close() 方法时关闭它们。

如果你用它来构建整个程序,任务将按层次排列,当最外层的任务被取消时,整个结构将(某种程度上)优雅地展开。如果你只在某些地方使用它而不在其他地方使用它(即,如果你在某些地方写 asyncio.create_task),这种展开可能效果不佳。

我对此经验不多,所以当然可能存在未发现的错误。这是一个小演示程序:

async def main():
    async def task1():
        print("Task1 started", time.ctime())
        await asyncio.sleep(2)
        print("Task1 finished", time.ctime())
        
    async def task2():
        print("Task2 started", time.ctime())
        await asyncio.sleep(3)
        raise Exception("Task 2 error")
    
    async def task3(bag):
        bag.create_task(task2())
        print("Task 3 done")
        
    try:
        async with PBag() as bag:
            bag.create_task(task1())
            bag.create_task(task3(bag))
            await bag.until_done()
            bag.create_task(task1())
            await bag
    except asyncio.CancelledError:
        traceback.print_exc()
    except Exception:
        traceback.print_exc()
    print("Bag closed", time.ctime())
    asyncio.create_task(task1())
    print("Program finished", time.ctime())
        
if __name__ == "__main__":
    asyncio.run(main())

设法找到我自己的问题的答案。任务取消可以通过结构化并发实现,在当前版本 Python (Python 3.10) 中不支持,尽管有 proposal to introduce TaskGroups following PEP 654.

幸运的是,AnyIO 库在 asyncio 之上实现了类似三重奏的结构化并发。我问题中的示例可以在 AnyIO 中重写以具有可取消的任务:

import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack

async def coroutine1(send_stream):
    async with send_stream:
        await send_stream.send(1)

async def coroutine2(send_stream):
    async with send_stream:
        await asyncio.sleep(1)
        await send_stream.send(2)

async def process_result(receive_stream, send_stream):
    async with AsyncExitStack() as stack:
        rs = await stack.enter_async_context(receive_stream)
        ss = await stack.enter_async_context(send_stream)
        res_rs = await rs.receive()
        raise Exception
        await ss.send(res_rs + 1)

async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
    task_status.started()
    async with AsyncExitStack() as stack:
        rs_2 = await stack.enter_async_context(receive_stream_2)
        rs_3 = await stack.enter_async_context(receive_stream_3)
        res_rs_2 = await rs_2.receive()
        res_rs_3 = await rs_3.receive()
        return res_rs_2 + res_rs_3
    

async def f():
    async with create_task_group() as tg:
        send_stream_1, receive_stream_1 = create_memory_object_stream(1)
        tg.start_soon(coroutine1, send_stream_1)
        send_stream_2, receive_stream_2 = create_memory_object_stream(1)
        tg.start_soon(coroutine2, send_stream_2)
        send_stream_3, receive_stream_3 = create_memory_object_stream(1)
        tg.start_soon(process_result, receive_stream_1, send_stream_3)
        # process_result will raise an Exception which will cancel all tasks in tg group
        result = await process_results(receive_stream_2, receive_stream_3)
        print(result)

asyncio.run(f())