Python 3.5 中协程和 future/task 的区别?

Difference between coroutine and future/task in Python 3.5?

假设我们有一个虚拟函数:

async def foo(arg):
    result = await some_remote_call(arg)
    return result.upper()

有什么区别:

import asyncio    

coros = []
for i in range(5):
    coros.append(foo(i))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(coros))

并且:

import asyncio

futures = []
for i in range(5):
    futures.append(asyncio.ensure_future(foo(i)))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(futures))

注意:例子return是一个结果,但这不是问题的重点。当 return 值很重要时,请使用 gather() 而不是 wait()

无论 return 值如何,我都在寻找 ensure_future() 的清晰度。 wait(coros)wait(futures) 都是 运行 协程,那么什么时候以及为什么应该将协程包装在 ensure_future 中?

基本上,使用 Python 3.5 的 async 运行 一堆非阻塞操作的正确方法 (tm) 是什么?

为了加分,如果我想批量调用怎么办?例如,我需要调用 some_remote_call(...) 1000 次,但我不想用 1000 个并发连接压垮 Web server/database/etc。这对于线程或进程池是可行的,但是有没有办法使用 asyncio?

2020 更新 (Python 3.7+):不要使用这些片段。而是使用:

import asyncio

async def do_something_async():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(foo(i)))
    await asyncio.gather(*tasks)

def do_something():
    asyncio.run(do_something_async)

也可以考虑使用 Trio,一种强大的第三方 asyncio 替代方案。

协程是一种生成器函数,既可以产生值,也可以从外部接受值。使用协程的好处是我们可以暂停函数的执行并在稍后恢复它。在网络操作的情况下,在我们等待响应时暂停函数的执行是有意义的。我们可以在使用的时候运行一些其他的功能。

未来就像 Javascript 中的 Promise 个对象。它就像一个未来将要实现的价值的占位符。在 above-mentioned 的情况下,当在网络 I/O 上等待时,一个函数可以给我们一个容器,承诺在操作完成时它将用值填充容器。我们持有 future 对象,当它完成时,我们可以调用它的方法来检索实际结果。

直接回答:不需要结果就可以ensure_future。如果您需要结果或检索发生的异常,它们很好。

额外积分: 我会选择 run_in_executor 并传递一个 Executor 实例来控制最大工人数。

解释和示例代码

在第一个示例中,您使用的是协程。 wait 函数接受一堆协程并将它们组合在一起。所以 wait() 在所有协程都用完时完成(completed/finished return 计算所有值)。

loop = get_event_loop() # 
loop.run_until_complete(wait(coros))

run_until_complete 方法将确保循环在执行完成之前一直处于活动状态。请注意在这种情况下您是如何得不到异步执行的结果的。

在第二个示例中,您使用 ensure_future 函数包装协程和 return 一个 Task 对象,它是一种 Future。当您调用 ensure_future 时,协程计划在主事件循环中执行。 returned future/task 对象还没有值,但随着时间的推移,当网络操作完成时,未来的对象将保存操作的结果。

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

所以在这个例子中,我们做同样的事情,除了我们使用 futures 而不是仅仅使用协程。

让我们看一个如何使用的例子 asyncio/coroutines/futures:

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

在这里,我们在loop 对象上使用了create_task 方法。 ensure_future 会在主事件循环中安排任务。这种方法使我们能够在我们选择的循环上安排协程。

我们还看到了在任务对象上使用 add_done_callback 方法添加回调的概念。

A Task 当协程 return 是一个值、引发异常或被取消时 done。有一些方法可以检查这些事件。

我写了一些关于这些主题的博文,可能会有帮助:

当然,你可以在官方手册上找到更多的细节:https://docs.python.org/3/library/asyncio.html

链接到 https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L346 的 Vincent 评论显示 wait() 为您将协程包装在 ensure_future() 中!

也就是说,我们确实需要一个future,协程会默默的转化为它们。

当我找到关于如何批处理的明确解释时,我会更新这个答案 coroutines/futures。

From the BDFL [2013]

任务

  • 这是一个包裹在 Future 中的协程
  • class 任务是 class 未来
  • 的子class
  • 所以它也适用于 await

  • 它与裸协程有何不同?
  • 不用等待也能进步
    • 只要你等待别的东西,即
      • 等待 [something_else]

考虑到这一点,ensure_future 作为创建任务的名称是有意义的,因为无论您是否 await 都会计算 Future 的结果(只要当你等待某事时)。这允许事件循环在您等待其他事情时完成您的任务。请注意,在 Python 3.7 中 create_task 是首选方式 ensure a future.

注意:我将 Guido 幻灯片中的 "yield from" 更改为 "await" 以实现现代性。

简单回答

  • 调用协程函数 (async def) 不会 运行 它。它 returns 一个协程对象,就像生成器函数 returns 生成器对象。
  • await 从协程中检索值,即 "calls" 协程
  • eusure_future/create_task 将协程安排到 运行 下一次迭代的事件循环中(尽管不等待它们完成,就像守护线程一样)。

一些代码示例

让我们先澄清一些术语:

  • 协程函数,你 async defs;
  • 协程对象,当你"call"一个协程函数时你得到了什么;
  • 任务,一个包裹在事件循环中 运行 协程对象的对象。

案例 1,await 在协程上

我们创建了两个协程,await一个,并使用create_task到运行另一个。

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

你会得到结果:

1539486251.7055213 - await
1539486251.7055705 - create_task

解释:

task1直接执行,task2在后面的迭代中执行

情况 2,将控制权交给事件循环

如果我们替换主函数,我们会看到不同的结果:

async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

你会得到结果:

-> % python coro.py
1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

解释:

当调用 asyncio.sleep(1) 时,控制被交还给事件循环,循环检查 运行 的任务,然后 运行 由 [= 创建的任务20=].

请注意,我们首先调用协程函数,而不是 await 它,所以我们只是创建了一个协程,而不是 运行ning。然后,我们再次调用协程函数,并将其包装在 create_task 调用中,creat_task 将实际调度协程到 运行 下一次迭代。因此,在结果中,create taskawait.

之前执行

实际上,这里的重点是将控制权交还给循环,您可以使用 asyncio.sleep(0) 来查看相同的结果。

引擎盖下

loop.create_task实际上调用了asyncio.tasks.Task(),后者会调用loop.call_soon。而loop.call_soon会将任务放在loop._ready中。在循环的每次迭代中,它检查 loop._ready 和 运行 中的每个回调。

asyncio.waitasyncio.ensure_futureasyncio.gather实际上直接或间接调用了loop.create_task

另请注意 docs

Callbacks are called in the order in which they are registered. Each callback will be called exactly once.