在另一个 asyncio.gather() 中使用嵌套的 asyncio.gather()

Using nested asyncio.gather() inside another asyncio.gather()

我有一个 class 有多种方法。我有一个 class 类似的方法:

 class MyClass:

    async def master_method(self):
      tasks = [self.sub_method() for _ in range(10)]
      results = await asyncio.gather(*tasks)

    async def sub_method(self):
      subtasks = [self.my_task() for _ in range(10)]
      results = await asyncio.gather(*subtasks)

    async def my_task(self):
      return "task done"  

所以这里的问题是:

  1. advantages/disadvantages 在从另一个 asyncio.gather() 调用的协程中使用 asyncio.gather() 有什么问题吗?有任何性能问题吗?

  2. asyncio 循环是否所有级别的所有任务都以相同的优先级处理?这是否会提供与我用 master_method?

    中的单个 asyncio.gather() 调用所有协同例程相同的性能?

TLDR:使用 gather 而不是返回任务可以简化使用并使代码更易于维护。虽然 gather 有一些开销,但对于任何实际应用来说都可以忽略不计。


为什么 gather

gather 在退出协程之前累积 child 个任务的要点是延迟协程的完成,直到它的 child 个任务完成。这 封装了 实现,并确保协程显示为“做它的事情”的单个实体。
另一种方法是 return child 任务,并期望调用者 运行 它们完成。

为简单起见,让我们看一下单层 – 对应于中间 sub_method – 但有不同的变化。

async def child(i):
    await asyncio.sleep(0.2)  # some non-trivial payload
    print("child", i, "done")

async def encapsulated() -> None:
    await asyncio.sleep(0.1)  # some preparation work
    children = [child() for _ in range(10)]
    await asyncio.gather(*children)

async def task_children() -> 'List[asyncio.Task]':
    await asyncio.sleep(0.1)  # some preparation work
    children = [asyncio.create_task(child()) for _ in range(10)]
    return children

async def coro_children() -> 'List[Awaitable[None]]':
    await asyncio.sleep(0.1)  # some preparation work
    children = [child() for _ in range(10)]
    return children

所有encapsulatedtask_childrencoro_children都以某种方式编码有sub-tasks。这允许调用者以可靠地“完成”实际目标的方式 运行 它们。但是,每个变体的不同之处在于它自己做了多少以及调用者必须做多少:

  • encapsulated 是“最重”的变体:所有 children 在 Tasks 中都是 运行是一个额外的 gather。但是,调用者不会暴露于以下任何情况:
    await encapsulated()
    
    这保证了功能按预期工作,并且可以自由更改其实现。
  • task_children是中间变体:所有children都是运行在Tasks。调用者可以决定是否以及如何等待完成:
    tasks = await task_children()
    await asyncio.gather(*tasks)  # can add other tasks here as well
    
    这保证了 功能 按预期启动。不过,它的完成 依赖于调用者的一些知识。
  • coro_children 是“最轻”的变体:children 实际上没有 运行。调用者负责整个生命周期:
    tasks = await coro_children()
    # children don't actually run yet!
    await asyncio.gather(*tasks)  # can add other tasks here as well
    
    这完全依赖调用者启动并等待 sub-tasks.

使用 encapsulated 模式是一个安全的默认模式——它确保协程“正常工作”。值得注意的是,使用 internal gather 的协程仍然看起来像任何其他协程。

gather速度?

gather 实用程序 a) 确保它的参数是 运行,因为 Tasks and b) provides a Future 一旦任务完成就会触发。由于 gather 通常在将 运行 参数作为 Task 时使用,因此不会产生额外的开销;同样,这些都是常规 Task 并且具有与其他所有内容相同的 performance/priority 特征¹。

唯一的开销来自包装 Future;这负责簿记(确保参数是任务)然后只等待,即什么也不做。在我的机器上,measuring the overhead 显示它平均花费的时间是 运行 宁 no-op Task 的两倍。对于任何 real-world 任务,这本身应该已经可以忽略不计了。

此外,gathering child 任务的模式本质上意味着有 gather 个节点。因此 gather 节点的数量通常远低于任务的数量。例如每[=15=10个任务的情况下,总共只需要11gathers就可以处理总共100个任务

master_method                                                  0

sub_method         0          1          2          3          4          5 ...

my_task       0123456789 0123456789 0123456789 0123456789 0123456789 0123456789 ...

¹也就是说,none。 asyncio 目前没有 Task 优先级的概念。