Python 本机协程和 send()

Python native coroutines and send()

基于生成器的协程有一个 send() 方法,它允许调用者和被调用者之间进行双向通信,并从调用者恢复生成的生成器协程。这是将生成器变成协程的功能。

虽然新的原生 async/await 协程提供了对异步 I/O 的卓越支持,但我看不出如何使用它们获得 send() 的等效项。在 async 函数中使用 yield 是明确禁止的,因此本机协程只能 return 使用 return 语句一次。尽管 await 表达式将新值带入协同程序,但这些值来自被调用者,而不是调用者,并且等待的调用每次都从头开始计算,而不是从它停止的地方计算。

有没有办法从停止的地方恢复 returned 协程并可能发送新值? 我如何使用本机协程模拟 David Beazley 的 Curious Course on Coroutines and Concurrency 中的技术?

我想到的一般代码模式是这样的

def myCoroutine():
  ...
  while True:
    ...
    ping = yield(pong)
    ...

并在来电者中

while True:
  ...
  buzz = myCoroutineGen.send(bizz)
  ...

编辑

我接受了凯文的回答,但我注意到 PEP says

Coroutines are based on generators internally, thus they share the implementation. Similarly to generator objects, coroutines have throw() , send() and close() methods.

...

throw() , send() methods for coroutines are used to push values and raise errors into Future-like objects.

那么显然原生协程确实有 send()?没有 yield 表达式如何接收协程内部的值?

Is there a way to resume a returned coroutine from where it left off and potentially send in a new value?

没有

asyncawait 只是 yield from 的语法糖。当一个协程 returns (带有 return 语句)时,就是这样。框架不见了。它不可恢复。这正是发电机一直以来的工作方式。例如:

def foo():
    return (yield)

你可以做f = foo(); next(f); f.send(5),你会得到5。但是如果你再次尝试f.send(),它不起作用,因为你已经从框架中返回了。 f 不再是实时生成器。

现在,至于新协程,据我所知,yield 和 sending 似乎是为事件循环和某些基本谓词(例如 asyncio.sleep() 之间的通信保留的。协程产生 asyncio.Future objects up to the event loop, and the event loop sends those same future objects back into the coroutine once the associated operations have been completed (they are typically scheduled via call_soon() 和其他事件循环方法)。

您可以通过等待它们来产生未来的对象,但它不是像 .send() 那样的通用接口。它专门供事件循环实现使用。如果你没有实现事件循环,你可能不想玩这个。如果您正在实现事件循环,您需要问问自己为什么asyncio中完美的实现不足以满足您的目的,并解释具体 在我们可以帮助您之前,您正在尝试做。

请注意 yield from 并未弃用。如果您想要完全不依赖于事件循环的协程,只需使用它即可。 asyncawaitspecifically designed for asynchronous programming with event loops。如果这不是您正在做的,那么 asyncawait 是错误的开始工具。

还有一件事:

The use of yield in async functions is explicitly forbidden, so native coroutines can return only once using a return statement.

await 表达式 do 屈服控制。 await something() 完全类似于 yield from something()。他们只是更改了名称,这样对于不熟悉生成器的人来说会更直观。


对于那些真正有兴趣实现自己的事件循环的人,here's some example code showing a (very minimal) implementation. This event loop is extremely stripped down, because it is designed to run certain specially-written coroutines synchronously as if they were normal functions. It does not provide the full range of support you would expect from a real BaseEventLoop 实现,并且与任意协程一起使用是不安全的。

通常情况下,我会将代码包含在我的答案中,而不是链接到它,但存在版权问题并且它对答案本身并不重要。

在完成了 Beazley 的同一门(我必须说很棒)协程课程后,我问自己同样的问题——如何调整代码以与 Python 中介绍的原生协程一起工作3.5?

事实证明,可以通过对代码进行相对较小的更改来完成。我假设读者熟悉课程 material,并以 pyos4.py 版本为基础 - 第一个支持“系统调用”的 Scheduler 版本。

TIP: A full runnable example can be found in Appendix A at the end.

Objective

目标是转以下协程代码:

def foo():
    mytid = yield GetTid()  # a "system call"
    for i in xrange(3):
        print "I'm foo", mytid
        yield  # a "trap"

... 进入本机协同程序并仍然像以前一样使用:

async def foo():
    mytid = await GetTid()  # a "system call"
    for i in range(3):
        print("I'm foo", mytid)
        await ???  # a "trap" (will explain the missing bit later)

我们想 运行 它没有 asyncio,因为我们已经有自己的事件循环来驱动整个过程 - 它是 Scheduler class。

等待对象

原生协程无法立即运行,以下代码会导致错误:

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

sched = Scheduler()
sched.new(foo())
sched.mainloop()
Traceback (most recent call last):
    ...
    mytid = await GetTid()
TypeError: object GetTid can't be used in 'await' expression

PEP 492 解释了可以等待什么样的对象。其中一个选项是 “具有 __await__ 方法的对象 returning 迭代器”.

就像 yield from 一样,如果您熟悉它,await 充当等待对象和驱动协程的最外层代码(通常是事件循环)之间的隧道。最好用一个例子来证明这一点:

class Awaitable:
    def __await__(self):
        value = yield 1
        print("Awaitable received:", value)
        value = yield 2
        print("Awaitable received:", value)
        value = yield 3
        print("Awaitable received:", value)
        return 42


async def foo():
    print("foo start")
    result = await Awaitable()
    print("foo received result:", result)
    print("foo end")

以交互方式驱动 foo() 协同程序产生以下结果:

>>> f_coro = foo()  # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

发送到 f_coro 的任何内容都会向下传送到 Awaitable 实例。同样,无论 Awaitable.__await__() 产生什么,都会冒泡到发送值的最顶层代码。

整个过程对f_coro协程是透明的,不直接参与,看不到上下传递的值。但是,当 Awaitable 的迭代器耗尽时,它的 return 值成为 await 表达式的结果(在我们的例子中是 42),并且这就是 f_coro 终于恢复的地方。

请注意 await 协程中的表达式也可以链接。一个协程可以等待另一个协程,另一个协程等待另一个协程...直到整个链以 yield 在路上的某个地方结束。

将值发送到协程本身

这些知识对我们有什么帮助?好吧,在课程 material 中,协程可以生成 SystemCall 实例。调度程序理解这些并让系统调用处理请求的操作。

为了让协程将 SystemCall 带到调度程序,SystemCall 实例可以简单地 yield 自身 ,它将被引导直到上一节中描述的调度程序。

因此,第一个需要的更改是将此逻辑添加到基础 SystemCall class:

class SystemCall:
    ...
    def __await__(self):
        yield self

随着 SystemCall 个实例变为可等待,以下现在实际上 运行s:

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()

输出:

I'm foo None
Task 1 terminated

太棒了,它不再崩溃了!

但是协程没有收到任务ID,而是收到了None。这是因为系统调用的handle()方法设置的值和Task.run()方法发送的值:

# in Task.run()
self.target.send(self.sendval)

... 以 SystemCall.__await__() 方法结束。如果我们要将值带入协程,系统调用必须return它,这样它就成为协程中await表达式的值。

class SystemCall:
    ...
    def __await__(self):
        return (yield self)

运行 与修改后的 SystemCall 相同的代码产生所需的输出:

I'm foo 1
Task 1 terminated

运行 协程并发

我们仍然需要一种方法来暂停协程,即有一个系统“陷阱”代码。在课程 material 中,这是在协程中使用普通 yield 完成的,但尝试使用普通 await 实际上是语法错误:

async def foo():
    mytid = await GetTid()
    for i in range(3):
        print("I'm foo", mytid)
        await  # SyntaxError here

幸运的是,解决方法很简单。由于我们已经有工作系统调用,我们可以添加一个虚拟的无操作系统调用,它的唯一工作是暂停协程并立即重新安排它:

class YieldControl(SystemCall):
    def handle(self):
        self.task.sendval = None   # setting sendval is optional
        self.sched.schedule(self.task)

在任务上设置 sendval 是可选的,因为预计此系统调用不会产生任何有意义的值,但我们选择明确这一点。

我们现在已经准备好 运行 多任务操作系统!

async def foo():
    mytid = await GetTid()
    for i in range(3):
        print("I'm foo", mytid)
        await YieldControl()


async def bar():
    mytid = await GetTid()
    for i in range(5):
        print("I'm bar", mytid)
        await YieldControl()


sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()

输出:

I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
Task 1 terminated
I'm bar 2
I'm bar 2
Task 2 terminated

脚注

Scheduler代码完全没变。

它。只是。有效。

这显示了原始设计的美妙之处,其中调度程序和其中 运行 的任务彼此不耦合,我们能够在没有 Scheduler 的情况下更改协程实现了解它。甚至包装协程的 Task class 也不必更改。

不需要蹦床。

pyos8.py版本的系统中,实现了蹦床的概念。它允许协程在 shceduler 的帮助下将他们的一部分工作委托给另一个协程(调度器代表父协程调用子协程并将前者的结果发送给父协程)。

不需要此机制,因为 await(及其较早的同伴 yield from)已经使这种链接成为可能,如开头所述。

附录 A - 完整的 运行 可用示例(需要 Python 3.5+)

example_full.py
from queue import Queue


# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task:
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid = Task.taskid   # Task ID
        self.target = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)


# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
class Scheduler:
    def __init__(self):
        self.ready = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result,SystemCall):
                    result.task  = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)


# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------
class SystemCall:
    def handle(self):
        pass

    def __await__(self):
        return (yield self)


# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)


class YieldControl(SystemCall):
    def handle(self):
        self.task.sendval = None   # setting sendval is optional
        self.sched.schedule(self.task)


# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    async def foo():
        mytid = await GetTid()
        for i in range(3):
            print("I'm foo", mytid)
            await YieldControl()


    async def bar():
        mytid = await GetTid()
        for i in range(5):
            print("I'm bar", mytid)
            await YieldControl()

    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()