Python 本机协程和 send()
Python native coroutines and send()
基于生成器的协程有一个 send()
方法,它允许调用者和被调用者之间进行双向通信,并从调用者恢复生成的生成器协程。这是将生成器变成协程的功能。
虽然新的原生 async/await
协程提供了对异步 I/O 的卓越支持,但我看不出如何使用它们获得 send()
的等效项。在 async
函数中使用 yield
是明确禁止的,因此本机协程只能 return 使用 return
语句一次。尽管 await
表达式将新值带入协同程序,但这些值来自被调用者,而不是调用者,并且等待的调用每次都从头开始计算,而不是从它停止的地方计算。
有没有办法从停止的地方恢复 returned 协程并可能发送新值?
我如何使用本机协程模拟 David Beazley 的 Curious Course on Coroutines and Concurrency 中的技术?
我想到的一般代码模式是这样的
def myCoroutine():
...
while True:
...
ping = yield(pong)
...
并在来电者中
while True:
...
buzz = myCoroutineGen.send(bizz)
...
编辑
我接受了凯文的回答,但我注意到 PEP says
Coroutines are based on generators internally, thus they share the implementation. Similarly to generator objects, coroutines have throw() , send() and close() methods.
...
throw() , send() methods for coroutines are used to push values and raise errors into Future-like objects.
那么显然原生协程确实有 send()
?没有 yield
表达式如何接收协程内部的值?
Is there a way to resume a returned coroutine from where it left off and potentially send in a new value?
没有
async
和 await
只是 yield from
的语法糖。当一个协程 returns (带有 return
语句)时,就是这样。框架不见了。它不可恢复。这正是发电机一直以来的工作方式。例如:
def foo():
return (yield)
你可以做f = foo(); next(f); f.send(5)
,你会得到5。但是如果你再次尝试f.send()
,它不起作用,因为你已经从框架中返回了。 f
不再是实时生成器。
现在,至于新协程,据我所知,yield 和 sending 似乎是为事件循环和某些基本谓词(例如 asyncio.sleep()
之间的通信保留的。协程产生 asyncio.Future
objects up to the event loop, and the event loop sends those same future objects back into the coroutine once the associated operations have been completed (they are typically scheduled via call_soon()
和其他事件循环方法)。
您可以通过等待它们来产生未来的对象,但它不是像 .send()
那样的通用接口。它专门供事件循环实现使用。如果你没有实现事件循环,你可能不想玩这个。如果您正在实现事件循环,您需要问问自己为什么asyncio
中完美的实现不足以满足您的目的,并解释具体 在我们可以帮助您之前,您正在尝试做。
请注意 yield from
并未弃用。如果您想要完全不依赖于事件循环的协程,只需使用它即可。 async
和 await
是 specifically designed for asynchronous programming with event loops。如果这不是您正在做的,那么 async
和 await
是错误的开始工具。
还有一件事:
The use of yield
in async functions is explicitly forbidden, so native coroutines can return only once using a return
statement.
await
表达式 do 屈服控制。 await something()
完全类似于 yield from something()
。他们只是更改了名称,这样对于不熟悉生成器的人来说会更直观。
对于那些真正有兴趣实现自己的事件循环的人,here's some example code showing a (very minimal) implementation. This event loop is extremely stripped down, because it is designed to run certain specially-written coroutines synchronously as if they were normal functions. It does not provide the full range of support you would expect from a real BaseEventLoop 实现,并且与任意协程一起使用是不安全的。
通常情况下,我会将代码包含在我的答案中,而不是链接到它,但存在版权问题并且它对答案本身并不重要。
在完成了 Beazley 的同一门(我必须说很棒)协程课程后,我问自己同样的问题——如何调整代码以与 Python 中介绍的原生协程一起工作3.5?
事实证明,可以通过对代码进行相对较小的更改来完成。我假设读者熟悉课程 material,并以 pyos4.py 版本为基础 - 第一个支持“系统调用”的 Scheduler
版本。
TIP: A full runnable example can be found in Appendix A at the end.
Objective
目标是转以下协程代码:
def foo():
mytid = yield GetTid() # a "system call"
for i in xrange(3):
print "I'm foo", mytid
yield # a "trap"
... 进入本机协同程序并仍然像以前一样使用:
async def foo():
mytid = await GetTid() # a "system call"
for i in range(3):
print("I'm foo", mytid)
await ??? # a "trap" (will explain the missing bit later)
我们想 运行 它没有 asyncio
,因为我们已经有自己的事件循环来驱动整个过程 - 它是 Scheduler
class。
等待对象
原生协程无法立即运行,以下代码会导致错误:
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
sched = Scheduler()
sched.new(foo())
sched.mainloop()
Traceback (most recent call last):
...
mytid = await GetTid()
TypeError: object GetTid can't be used in 'await' expression
PEP 492 解释了可以等待什么样的对象。其中一个选项是 “具有 __await__
方法的对象 returning 迭代器”.
就像 yield from
一样,如果您熟悉它,await
充当等待对象和驱动协程的最外层代码(通常是事件循环)之间的隧道。最好用一个例子来证明这一点:
class Awaitable:
def __await__(self):
value = yield 1
print("Awaitable received:", value)
value = yield 2
print("Awaitable received:", value)
value = yield 3
print("Awaitable received:", value)
return 42
async def foo():
print("foo start")
result = await Awaitable()
print("foo received result:", result)
print("foo end")
以交互方式驱动 foo()
协同程序产生以下结果:
>>> f_coro = foo() # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
发送到 f_coro
的任何内容都会向下传送到 Awaitable
实例。同样,无论 Awaitable.__await__()
产生什么,都会冒泡到发送值的最顶层代码。
整个过程对f_coro
协程是透明的,不直接参与,看不到上下传递的值。但是,当 Awaitable
的迭代器耗尽时,它的 return 值成为 await
表达式的结果(在我们的例子中是 42),并且这就是 f_coro
终于恢复的地方。
请注意 await
协程中的表达式也可以链接。一个协程可以等待另一个协程,另一个协程等待另一个协程...直到整个链以 yield
在路上的某个地方结束。
将值发送到协程本身
这些知识对我们有什么帮助?好吧,在课程 material 中,协程可以生成 SystemCall
实例。调度程序理解这些并让系统调用处理请求的操作。
为了让协程将 SystemCall
带到调度程序,SystemCall
实例可以简单地 yield 自身 ,它将被引导直到上一节中描述的调度程序。
因此,第一个需要的更改是将此逻辑添加到基础 SystemCall
class:
class SystemCall:
...
def __await__(self):
yield self
随着 SystemCall
个实例变为可等待,以下现在实际上 运行s:
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()
输出:
I'm foo None
Task 1 terminated
太棒了,它不再崩溃了!
但是协程没有收到任务ID,而是收到了None
。这是因为系统调用的handle()
方法设置的值和Task.run()
方法发送的值:
# in Task.run()
self.target.send(self.sendval)
... 以 SystemCall.__await__()
方法结束。如果我们要将值带入协程,系统调用必须return它,这样它就成为协程中await
表达式的值。
class SystemCall:
...
def __await__(self):
return (yield self)
运行 与修改后的 SystemCall
相同的代码产生所需的输出:
I'm foo 1
Task 1 terminated
运行 协程并发
我们仍然需要一种方法来暂停协程,即有一个系统“陷阱”代码。在课程 material 中,这是在协程中使用普通 yield
完成的,但尝试使用普通 await
实际上是语法错误:
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await # SyntaxError here
幸运的是,解决方法很简单。由于我们已经有工作系统调用,我们可以添加一个虚拟的无操作系统调用,它的唯一工作是暂停协程并立即重新安排它:
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional
self.sched.schedule(self.task)
在任务上设置 sendval
是可选的,因为预计此系统调用不会产生任何有意义的值,但我们选择明确这一点。
我们现在已经准备好 运行 多任务操作系统!
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await YieldControl()
async def bar():
mytid = await GetTid()
for i in range(5):
print("I'm bar", mytid)
await YieldControl()
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
输出:
I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
Task 1 terminated
I'm bar 2
I'm bar 2
Task 2 terminated
脚注
Scheduler
代码完全没变。
它。只是。有效。
这显示了原始设计的美妙之处,其中调度程序和其中 运行 的任务彼此不耦合,我们能够在没有 Scheduler
的情况下更改协程实现了解它。甚至包装协程的 Task
class 也不必更改。
不需要蹦床。
在pyos8.py版本的系统中,实现了蹦床的概念。它允许协程在 shceduler 的帮助下将他们的一部分工作委托给另一个协程(调度器代表父协程调用子协程并将前者的结果发送给父协程)。
不需要此机制,因为 await
(及其较早的同伴 yield from
)已经使这种链接成为可能,如开头所述。
附录 A - 完整的 运行 可用示例(需要 Python 3.5+)
example_full.py
from queue import Queue
# ------------------------------------------------------------
# === Tasks ===
# ------------------------------------------------------------
class Task:
taskid = 0
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
# ------------------------------------------------------------
# === Scheduler ===
# ------------------------------------------------------------
class Scheduler:
def __init__(self):
self.ready = Queue()
self.taskmap = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def exit(self,task):
print("Task %d terminated" % task.tid)
del self.taskmap[task.tid]
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
if isinstance(result,SystemCall):
result.task = task
result.sched = self
result.handle()
continue
except StopIteration:
self.exit(task)
continue
self.schedule(task)
# ------------------------------------------------------------
# === System Calls ===
# ------------------------------------------------------------
class SystemCall:
def handle(self):
pass
def __await__(self):
return (yield self)
# Return a task's ID number
class GetTid(SystemCall):
def handle(self):
self.task.sendval = self.task.tid
self.sched.schedule(self.task)
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional
self.sched.schedule(self.task)
# ------------------------------------------------------------
# === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await YieldControl()
async def bar():
mytid = await GetTid()
for i in range(5):
print("I'm bar", mytid)
await YieldControl()
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
基于生成器的协程有一个 send()
方法,它允许调用者和被调用者之间进行双向通信,并从调用者恢复生成的生成器协程。这是将生成器变成协程的功能。
虽然新的原生 async/await
协程提供了对异步 I/O 的卓越支持,但我看不出如何使用它们获得 send()
的等效项。在 async
函数中使用 yield
是明确禁止的,因此本机协程只能 return 使用 return
语句一次。尽管 await
表达式将新值带入协同程序,但这些值来自被调用者,而不是调用者,并且等待的调用每次都从头开始计算,而不是从它停止的地方计算。
有没有办法从停止的地方恢复 returned 协程并可能发送新值? 我如何使用本机协程模拟 David Beazley 的 Curious Course on Coroutines and Concurrency 中的技术?
我想到的一般代码模式是这样的
def myCoroutine():
...
while True:
...
ping = yield(pong)
...
并在来电者中
while True:
...
buzz = myCoroutineGen.send(bizz)
...
编辑
我接受了凯文的回答,但我注意到 PEP says
Coroutines are based on generators internally, thus they share the implementation. Similarly to generator objects, coroutines have throw() , send() and close() methods.
...
throw() , send() methods for coroutines are used to push values and raise errors into Future-like objects.
那么显然原生协程确实有 send()
?没有 yield
表达式如何接收协程内部的值?
Is there a way to resume a returned coroutine from where it left off and potentially send in a new value?
没有
async
和 await
只是 yield from
的语法糖。当一个协程 returns (带有 return
语句)时,就是这样。框架不见了。它不可恢复。这正是发电机一直以来的工作方式。例如:
def foo():
return (yield)
你可以做f = foo(); next(f); f.send(5)
,你会得到5。但是如果你再次尝试f.send()
,它不起作用,因为你已经从框架中返回了。 f
不再是实时生成器。
现在,至于新协程,据我所知,yield 和 sending 似乎是为事件循环和某些基本谓词(例如 asyncio.sleep()
之间的通信保留的。协程产生 asyncio.Future
objects up to the event loop, and the event loop sends those same future objects back into the coroutine once the associated operations have been completed (they are typically scheduled via call_soon()
和其他事件循环方法)。
您可以通过等待它们来产生未来的对象,但它不是像 .send()
那样的通用接口。它专门供事件循环实现使用。如果你没有实现事件循环,你可能不想玩这个。如果您正在实现事件循环,您需要问问自己为什么asyncio
中完美的实现不足以满足您的目的,并解释具体 在我们可以帮助您之前,您正在尝试做。
请注意 yield from
并未弃用。如果您想要完全不依赖于事件循环的协程,只需使用它即可。 async
和 await
是 specifically designed for asynchronous programming with event loops。如果这不是您正在做的,那么 async
和 await
是错误的开始工具。
还有一件事:
The use of
yield
in async functions is explicitly forbidden, so native coroutines can return only once using areturn
statement.
await
表达式 do 屈服控制。 await something()
完全类似于 yield from something()
。他们只是更改了名称,这样对于不熟悉生成器的人来说会更直观。
对于那些真正有兴趣实现自己的事件循环的人,here's some example code showing a (very minimal) implementation. This event loop is extremely stripped down, because it is designed to run certain specially-written coroutines synchronously as if they were normal functions. It does not provide the full range of support you would expect from a real BaseEventLoop 实现,并且与任意协程一起使用是不安全的。
通常情况下,我会将代码包含在我的答案中,而不是链接到它,但存在版权问题并且它对答案本身并不重要。
在完成了 Beazley 的同一门(我必须说很棒)协程课程后,我问自己同样的问题——如何调整代码以与 Python 中介绍的原生协程一起工作3.5?
事实证明,可以通过对代码进行相对较小的更改来完成。我假设读者熟悉课程 material,并以 pyos4.py 版本为基础 - 第一个支持“系统调用”的 Scheduler
版本。
TIP: A full runnable example can be found in Appendix A at the end.
Objective
目标是转以下协程代码:
def foo():
mytid = yield GetTid() # a "system call"
for i in xrange(3):
print "I'm foo", mytid
yield # a "trap"
... 进入本机协同程序并仍然像以前一样使用:
async def foo():
mytid = await GetTid() # a "system call"
for i in range(3):
print("I'm foo", mytid)
await ??? # a "trap" (will explain the missing bit later)
我们想 运行 它没有 asyncio
,因为我们已经有自己的事件循环来驱动整个过程 - 它是 Scheduler
class。
等待对象
原生协程无法立即运行,以下代码会导致错误:
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
sched = Scheduler()
sched.new(foo())
sched.mainloop()
Traceback (most recent call last): ... mytid = await GetTid() TypeError: object GetTid can't be used in 'await' expression
PEP 492 解释了可以等待什么样的对象。其中一个选项是 “具有 __await__
方法的对象 returning 迭代器”.
就像 yield from
一样,如果您熟悉它,await
充当等待对象和驱动协程的最外层代码(通常是事件循环)之间的隧道。最好用一个例子来证明这一点:
class Awaitable:
def __await__(self):
value = yield 1
print("Awaitable received:", value)
value = yield 2
print("Awaitable received:", value)
value = yield 3
print("Awaitable received:", value)
return 42
async def foo():
print("foo start")
result = await Awaitable()
print("foo received result:", result)
print("foo end")
以交互方式驱动 foo()
协同程序产生以下结果:
>>> f_coro = foo() # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
发送到 f_coro
的任何内容都会向下传送到 Awaitable
实例。同样,无论 Awaitable.__await__()
产生什么,都会冒泡到发送值的最顶层代码。
整个过程对f_coro
协程是透明的,不直接参与,看不到上下传递的值。但是,当 Awaitable
的迭代器耗尽时,它的 return 值成为 await
表达式的结果(在我们的例子中是 42),并且这就是 f_coro
终于恢复的地方。
请注意 await
协程中的表达式也可以链接。一个协程可以等待另一个协程,另一个协程等待另一个协程...直到整个链以 yield
在路上的某个地方结束。
将值发送到协程本身
这些知识对我们有什么帮助?好吧,在课程 material 中,协程可以生成 SystemCall
实例。调度程序理解这些并让系统调用处理请求的操作。
为了让协程将 SystemCall
带到调度程序,SystemCall
实例可以简单地 yield 自身 ,它将被引导直到上一节中描述的调度程序。
因此,第一个需要的更改是将此逻辑添加到基础 SystemCall
class:
class SystemCall:
...
def __await__(self):
yield self
随着 SystemCall
个实例变为可等待,以下现在实际上 运行s:
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()
输出:
I'm foo None Task 1 terminated
太棒了,它不再崩溃了!
但是协程没有收到任务ID,而是收到了None
。这是因为系统调用的handle()
方法设置的值和Task.run()
方法发送的值:
# in Task.run()
self.target.send(self.sendval)
... 以 SystemCall.__await__()
方法结束。如果我们要将值带入协程,系统调用必须return它,这样它就成为协程中await
表达式的值。
class SystemCall:
...
def __await__(self):
return (yield self)
运行 与修改后的 SystemCall
相同的代码产生所需的输出:
I'm foo 1 Task 1 terminated
运行 协程并发
我们仍然需要一种方法来暂停协程,即有一个系统“陷阱”代码。在课程 material 中,这是在协程中使用普通 yield
完成的,但尝试使用普通 await
实际上是语法错误:
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await # SyntaxError here
幸运的是,解决方法很简单。由于我们已经有工作系统调用,我们可以添加一个虚拟的无操作系统调用,它的唯一工作是暂停协程并立即重新安排它:
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional
self.sched.schedule(self.task)
在任务上设置 sendval
是可选的,因为预计此系统调用不会产生任何有意义的值,但我们选择明确这一点。
我们现在已经准备好 运行 多任务操作系统!
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await YieldControl()
async def bar():
mytid = await GetTid()
for i in range(5):
print("I'm bar", mytid)
await YieldControl()
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
输出:
I'm foo 1 I'm bar 2 I'm foo 1 I'm bar 2 I'm foo 1 I'm bar 2 Task 1 terminated I'm bar 2 I'm bar 2 Task 2 terminated
脚注
Scheduler
代码完全没变。
它。只是。有效。
这显示了原始设计的美妙之处,其中调度程序和其中 运行 的任务彼此不耦合,我们能够在没有 Scheduler
的情况下更改协程实现了解它。甚至包装协程的 Task
class 也不必更改。
不需要蹦床。
在pyos8.py版本的系统中,实现了蹦床的概念。它允许协程在 shceduler 的帮助下将他们的一部分工作委托给另一个协程(调度器代表父协程调用子协程并将前者的结果发送给父协程)。
不需要此机制,因为 await
(及其较早的同伴 yield from
)已经使这种链接成为可能,如开头所述。
附录 A - 完整的 运行 可用示例(需要 Python 3.5+)
example_full.pyfrom queue import Queue
# ------------------------------------------------------------
# === Tasks ===
# ------------------------------------------------------------
class Task:
taskid = 0
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
# ------------------------------------------------------------
# === Scheduler ===
# ------------------------------------------------------------
class Scheduler:
def __init__(self):
self.ready = Queue()
self.taskmap = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def exit(self,task):
print("Task %d terminated" % task.tid)
del self.taskmap[task.tid]
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
if isinstance(result,SystemCall):
result.task = task
result.sched = self
result.handle()
continue
except StopIteration:
self.exit(task)
continue
self.schedule(task)
# ------------------------------------------------------------
# === System Calls ===
# ------------------------------------------------------------
class SystemCall:
def handle(self):
pass
def __await__(self):
return (yield self)
# Return a task's ID number
class GetTid(SystemCall):
def handle(self):
self.task.sendval = self.task.tid
self.sched.schedule(self.task)
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional
self.sched.schedule(self.task)
# ------------------------------------------------------------
# === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await YieldControl()
async def bar():
mytid = await GetTid()
for i in range(5):
print("I'm bar", mytid)
await YieldControl()
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()