python - 如何将 C 函数实现为可等待的(协程)
python - how to implement a C-function as awaitable (coroutine)
环境:C和micropython虚拟机中的协同RTOS是任务之一。
为了让 VM 不阻塞其他 RTOS 任务,我在 vm.c:DISPATCH()
中插入 RTOS_sleep()
以便在执行每个字节码后,VM 将控制权交给下一个 RTOS 任务。
我创建了一个 uPy 接口来从物理数据总线异步获取数据——可以是 CAN、SPI、以太网——使用生产者-消费者设计模式。
uPy 中的用法:
can_q = CANbus.queue()
message = can_q.get()
C 中的实现是 can_q.get()
不会阻塞 RTOS:它轮询一个 C 队列,如果没有收到消息,它调用 RTOS_sleep()
给另一个任务机会填满队列。事情是同步的,因为 C 队列仅由另一个 RTOS 任务更新,而 RTOS 任务仅在调用 RTOS_sleep()
时切换,即 cooperative
C 实现基本上是:
// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep();
return c_queue_get_message();
虽然Python语句can_q.get()
不阻塞RTOS,但它确实阻塞了uPy脚本。
我想重写它,以便我可以将它与 async def
一起使用,即 coroutine 并且不会阻止 uPy 脚本。
不确定 the syntax 但像这样:
can_q = CANbus.queue()
message = await can_q.get()
问题
如何编写 C 函数以便可以 await
使用它?
我更喜欢 CPython 和 micropython 的答案,但我会接受仅 CPython 的答案。
注意:这个答案涵盖了 CPython 和 asyncio 框架。然而,这些概念应该适用于其他 Python 实现以及其他异步框架。
How do I write a C-function so I can await
on it?
编写可等待其结果的 C 函数的最简单方法是让其 return 成为一个已创建的可等待对象,例如 asyncio.Future
。在 returning Future
之前,代码必须安排未来的结果由某种异步机制设置。所有这些基于协程的方法都假定您的程序 运行 在某个知道如何安排协程的事件循环下。
但是 return 未来并不总是足够的 - 也许我们想定义一个具有任意数量的悬挂点的对象。返回一个 future 只会暂停一次(如果 returned future 没有完成),一旦 future 完成就恢复,就是这样。等同于包含多个 await
的 async def
的可等待对象不能通过 return 未来实现,它必须实现协程通常实现的协议。这有点像实现自定义 __next__
的迭代器,可以用来代替生成器。
定义自定义可等待对象
要定义我们自己的可等待类型,我们可以求助于 PEP 492,其中 specifies 可以将哪些对象传递给 await
。除了用 async def
定义的 Python 函数外,用户定义的类型可以通过定义 __await__
特殊方法使对象可等待,Python/C 映射到 tp_as_async.am_await
部分PyTypeObject
结构。
这意味着在 Python/C 中,您必须执行以下操作:
- 为扩展类型的
tp_as_async
字段指定一个非 NULL 值。
- 有其
am_await
member point to a C function that accepts an instance of your type and returns an instance of another extension type that implements the iterator protocol, i.e. defines tp_iter
(trivially defined as PyIter_Self
) and tp_iternext
.
- 迭代器的
tp_iternext
必须推进协程的状态机。来自 tp_iternext
的每个非异常 return 对应于一个暂停,最后的 StopIteration
异常表示协程的最终 return。 return 值存储在 StopIteration
的 value
属性 中。
要使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定在挂起后何时恢复。 asyncio 定义的大部分协程期望在 asyncio 事件循环下 运行,并在内部使用 asyncio.get_event_loop()
(and/or 接受显式 loop
参数)来获取其服务。
协程示例
为了说明 Python/C 代码需要实现什么,让我们考虑表示为 Python async def
的简单协程,例如 asyncio.sleep()
的等价物:
async def my_sleep(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
await future
# we get back here after the timeout has elapsed, and
# immediately return
my_sleep
创建一个 Future
,安排它在 n 秒内完成(其结果设置),并暂停自己直到将来完成。最后一部分使用 await
,其中 await x
表示 "allow x
to decide whether we will now suspend or keep executing"。一个不完整的 future 总是决定暂停,并且 asyncio Task
协程驱动程序特例产生 futures 无限期地暂停它们并将它们的完成连接到恢复任务。其他事件循环(curio 等)的暂停机制可能在细节上有所不同,但基本思想是相同的:await
是一个可选的暂停执行。
__await__()
return 是一个发电机
要将其转换为 C,我们必须摆脱神奇的 async def
函数定义,以及 await
暂停点。删除 async def
相当简单:等效的普通函数只需要 return 一个实现 __await__
:
的对象
def my_sleep(n):
return _MySleep(n)
class _MySleep:
def __init__(self, n):
self.n = n
def __await__(self):
return _MySleepIter(self.n)
由my_sleep()
编辑的_MySleep
对象return的__await__
方法将被await
运算符自动调用以转换awaitable 对象(传递给 await
的任何东西)到迭代器。该迭代器将用于询问等待的对象是选择暂停还是提供值。这很像 for o in x
语句如何调用 x.__iter__()
将 iterable x
转换为具体的 iterator.
当returned迭代器选择暂停时,它只需要产生一个值。值的含义(如果有的话)将由协程驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从await
return时,需要停止迭代。使用生成器作为方便的迭代器实现,_MySleepIter
看起来像这样:
def _MySleepIter(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
# yield from future.__await__()
for x in future.__await__():
yield x
由于 await x
映射到 yield from x.__await__()
,我们的生成器必须耗尽由 future.__await__()
编辑的迭代器 return。如果未来不完整,由 Future.__await__
编辑的迭代器 return 将产生,否则 return 未来的结果(我们在这里忽略,但 yield from
实际上提供)。
__await__()
return 是一个自定义迭代器
在 C 中实现 my_sleep
的最后一个障碍是使用 _MySleepIter
的生成器。幸运的是,任何生成器都可以转换为有状态迭代器,其 __next__
执行一段代码直到下一个 await 或 return。 __next__
实现生成器代码的状态机版本,其中 yield
通过 returning 值表示,return
通过提高 StopIteration
表示。例如:
class _MySleepIter:
def __init__(self, n):
self.n = n
self.state = 0
def __iter__(self): # an iterator has to define __iter__
return self
def __next__(self):
if self.state == 0:
loop = asyncio.get_event_loop()
self.future = loop.create_future()
loop.call_later(self.n, self.future.set_result, None)
self.state = 1
if self.state == 1:
if not self.future.done():
return next(iter(self.future))
self.state = 2
if self.state == 2:
raise StopIteration
raise AssertionError("invalid state")
翻译成 C
上面的输入相当多,但它有效,并且只使用可以用本机 Python/C 函数定义的结构。
实际上将两个 类 翻译成 C 非常简单,但超出了这个答案的范围。
环境:C和micropython虚拟机中的协同RTOS是任务之一。
为了让 VM 不阻塞其他 RTOS 任务,我在 vm.c:DISPATCH()
中插入 RTOS_sleep()
以便在执行每个字节码后,VM 将控制权交给下一个 RTOS 任务。
我创建了一个 uPy 接口来从物理数据总线异步获取数据——可以是 CAN、SPI、以太网——使用生产者-消费者设计模式。
uPy 中的用法:
can_q = CANbus.queue()
message = can_q.get()
C 中的实现是 can_q.get()
不会阻塞 RTOS:它轮询一个 C 队列,如果没有收到消息,它调用 RTOS_sleep()
给另一个任务机会填满队列。事情是同步的,因为 C 队列仅由另一个 RTOS 任务更新,而 RTOS 任务仅在调用 RTOS_sleep()
时切换,即 cooperative
C 实现基本上是:
// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep();
return c_queue_get_message();
虽然Python语句can_q.get()
不阻塞RTOS,但它确实阻塞了uPy脚本。
我想重写它,以便我可以将它与 async def
一起使用,即 coroutine 并且不会阻止 uPy 脚本。
不确定 the syntax 但像这样:
can_q = CANbus.queue()
message = await can_q.get()
问题
如何编写 C 函数以便可以 await
使用它?
我更喜欢 CPython 和 micropython 的答案,但我会接受仅 CPython 的答案。
注意:这个答案涵盖了 CPython 和 asyncio 框架。然而,这些概念应该适用于其他 Python 实现以及其他异步框架。
How do I write a C-function so I can
await
on it?
编写可等待其结果的 C 函数的最简单方法是让其 return 成为一个已创建的可等待对象,例如 asyncio.Future
。在 returning Future
之前,代码必须安排未来的结果由某种异步机制设置。所有这些基于协程的方法都假定您的程序 运行 在某个知道如何安排协程的事件循环下。
但是 return 未来并不总是足够的 - 也许我们想定义一个具有任意数量的悬挂点的对象。返回一个 future 只会暂停一次(如果 returned future 没有完成),一旦 future 完成就恢复,就是这样。等同于包含多个 await
的 async def
的可等待对象不能通过 return 未来实现,它必须实现协程通常实现的协议。这有点像实现自定义 __next__
的迭代器,可以用来代替生成器。
定义自定义可等待对象
要定义我们自己的可等待类型,我们可以求助于 PEP 492,其中 specifies 可以将哪些对象传递给 await
。除了用 async def
定义的 Python 函数外,用户定义的类型可以通过定义 __await__
特殊方法使对象可等待,Python/C 映射到 tp_as_async.am_await
部分PyTypeObject
结构。
这意味着在 Python/C 中,您必须执行以下操作:
- 为扩展类型的
tp_as_async
字段指定一个非 NULL 值。 - 有其
am_await
member point to a C function that accepts an instance of your type and returns an instance of another extension type that implements the iterator protocol, i.e. definestp_iter
(trivially defined asPyIter_Self
) andtp_iternext
. - 迭代器的
tp_iternext
必须推进协程的状态机。来自tp_iternext
的每个非异常 return 对应于一个暂停,最后的StopIteration
异常表示协程的最终 return。 return 值存储在StopIteration
的value
属性 中。
要使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定在挂起后何时恢复。 asyncio 定义的大部分协程期望在 asyncio 事件循环下 运行,并在内部使用 asyncio.get_event_loop()
(and/or 接受显式 loop
参数)来获取其服务。
协程示例
为了说明 Python/C 代码需要实现什么,让我们考虑表示为 Python async def
的简单协程,例如 asyncio.sleep()
的等价物:
async def my_sleep(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
await future
# we get back here after the timeout has elapsed, and
# immediately return
my_sleep
创建一个 Future
,安排它在 n 秒内完成(其结果设置),并暂停自己直到将来完成。最后一部分使用 await
,其中 await x
表示 "allow x
to decide whether we will now suspend or keep executing"。一个不完整的 future 总是决定暂停,并且 asyncio Task
协程驱动程序特例产生 futures 无限期地暂停它们并将它们的完成连接到恢复任务。其他事件循环(curio 等)的暂停机制可能在细节上有所不同,但基本思想是相同的:await
是一个可选的暂停执行。
__await__()
return 是一个发电机
要将其转换为 C,我们必须摆脱神奇的 async def
函数定义,以及 await
暂停点。删除 async def
相当简单:等效的普通函数只需要 return 一个实现 __await__
:
def my_sleep(n):
return _MySleep(n)
class _MySleep:
def __init__(self, n):
self.n = n
def __await__(self):
return _MySleepIter(self.n)
由my_sleep()
编辑的_MySleep
对象return的__await__
方法将被await
运算符自动调用以转换awaitable 对象(传递给 await
的任何东西)到迭代器。该迭代器将用于询问等待的对象是选择暂停还是提供值。这很像 for o in x
语句如何调用 x.__iter__()
将 iterable x
转换为具体的 iterator.
当returned迭代器选择暂停时,它只需要产生一个值。值的含义(如果有的话)将由协程驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从await
return时,需要停止迭代。使用生成器作为方便的迭代器实现,_MySleepIter
看起来像这样:
def _MySleepIter(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
# yield from future.__await__()
for x in future.__await__():
yield x
由于 await x
映射到 yield from x.__await__()
,我们的生成器必须耗尽由 future.__await__()
编辑的迭代器 return。如果未来不完整,由 Future.__await__
编辑的迭代器 return 将产生,否则 return 未来的结果(我们在这里忽略,但 yield from
实际上提供)。
__await__()
return 是一个自定义迭代器
在 C 中实现 my_sleep
的最后一个障碍是使用 _MySleepIter
的生成器。幸运的是,任何生成器都可以转换为有状态迭代器,其 __next__
执行一段代码直到下一个 await 或 return。 __next__
实现生成器代码的状态机版本,其中 yield
通过 returning 值表示,return
通过提高 StopIteration
表示。例如:
class _MySleepIter:
def __init__(self, n):
self.n = n
self.state = 0
def __iter__(self): # an iterator has to define __iter__
return self
def __next__(self):
if self.state == 0:
loop = asyncio.get_event_loop()
self.future = loop.create_future()
loop.call_later(self.n, self.future.set_result, None)
self.state = 1
if self.state == 1:
if not self.future.done():
return next(iter(self.future))
self.state = 2
if self.state == 2:
raise StopIteration
raise AssertionError("invalid state")
翻译成 C
上面的输入相当多,但它有效,并且只使用可以用本机 Python/C 函数定义的结构。
实际上将两个 类 翻译成 C 非常简单,但超出了这个答案的范围。