如何使用 python 的 asyncio 模块正确创建和 运行 并发任务?

How to properly create and run concurrent tasks using python's asyncio module?

我正在尝试正确理解并同时实现两个 运行ning Task objects using Python 3's relatively new asyncio 模块。

简而言之,asyncio 似乎旨在处理异步进程和通过事件循环并发 Task 执行。它提倡使用 await(应用于异步函数)作为一种无需回调的方式来等待和使用结果,而不会阻塞事件循环。 (期货和回调仍然是一个可行的选择。)

它还提供 asyncio.Task() class,Future 的专门子 class,旨在包装协程。最好使用 asyncio.ensure_future() 方法调用。 asyncio 任务的预期用途是允许 运行ning 任务独立地 运行 'concurrently' 与同一事件循环中的其他任务。我的理解是 Tasks 连接到事件循环,然后自动保持在 await 语句之间驱动协程。

我喜欢无需使用 Executor class 之一即可使用并发任务的想法,但我没有找到关于实现的详细说明。

这就是我目前的做法:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

在尝试同时 运行 两个循环任务的情况下,我注意到除非任务具有内部 await 表达式,否则它将卡在 while循环,有效地阻止来自 运行ning 的其他任务(很像正常的 while 循环)。但是,一旦任务必须 (a) 等待,它们似乎 运行 同时没有问题。

因此,await语句似乎为事件循环提供了一个落脚点,可以在任务之间来回切换,从而达到并发的效果。

内部示例输出 await:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

示例输出 没有 内部 await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

问题

此实现是否符合 asyncio 中并发循环任务的 'proper' 示例?

唯一可行的方法是 Task 提供阻塞点(await 表达式)以便事件循环处理多个任务,这是否正确?

是的,在事件循环中 运行ning 的任何协程都会阻止 运行ning 中的其他协程和任务,除非它

  1. 使用 yield fromawait 调用另一个协程(如果使用 Python 3.5+)。
  2. Returns.

这是因为asyncio是单线程的;事件循环到 运行 的唯一方法是没有其他协程主动执行。使用 yield from/await 暂时挂起协程,让事件循环有机会工作。

您的示例代码很好,但在许多情况下,您可能不希望在 I/O 运行ning事件循环开始。在这些情况下,在后台线程或进程中使用 asyncio.loop.run_in_executor 到 运行 代码通常更有意义。如果您的任务是 CPU-bound,ProcessPoolExecutor 将是更好的选择,如果您需要做一些 I/O 而不是 asyncio,则可以使用 ThreadPoolExecutor -友好。

例如,您的两个循环完全 CPU 绑定并且不共享任何状态,因此最好的性能来自使用 ProcessPoolExecutor 到 运行 每个循环并行跨越 CPUs:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = loop.run_in_executor(executor, say_boo)
    baa = loop.run_in_executor(executor, say_baa)

    loop.run_forever()

您不一定需要 yield from x 来控制事件循环。

在您的示例中,我认为 正确的 方法是执行 yield None 或等效的简单 yield,而不是 yield from asyncio.sleep(0.001):

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

协程只是普通的旧 Python 生成器。 在内部,asyncio 事件循环记录了这些生成器,并在永无止境的循环中对每个生成器调用 gen.send()。每当您 yield 时,对 gen.send() 的调用就会完成并且循环可以继续。 (我正在简化它;查看 https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 实际代码)

也就是说,如果您需要在不共享数据的情况下进行 CPU 密集计算,我仍然会选择 run_in_executor 路线。