python - 如何将 C 函数实现为可等待的(协程)

python - how to implement a C-function as awaitable (coroutine)

环境:C和micropython虚拟机中的协同RTOS是任务之一。

为了让 VM 不阻塞其他 RTOS 任务,我在 vm.c:DISPATCH() 中插入 RTOS_sleep() 以便在执行每个字节码后,VM 将控制权交给下一个 RTOS 任务。

我创建了一个 uPy 接口来从物理数据总线异步获取数据——可以是 CAN、SPI、以太网——使用生产者-消费者设计模式。

uPy 中的用法:

can_q = CANbus.queue()
message = can_q.get()

C 中的实现是 can_q.get() 不会阻塞 RTOS:它轮询一个 C 队列,如果没有收到消息,它调用 RTOS_sleep() 给另一个任务机会填满队列。事情是同步的,因为 C 队列仅由另一个 RTOS 任务更新,而 RTOS 任务仅在调用 RTOS_sleep() 时切换,即 cooperative

C 实现基本上是:

// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep(); 
return c_queue_get_message();

虽然Python语句can_q.get()不阻塞RTOS,但它确实阻塞了uPy脚本。 我想重写它,以便我可以将它与 async def 一起使用,即 coroutine 并且不会阻止 uPy 脚本。

不确定 the syntax 但像这样:

can_q = CANbus.queue()
message = await can_q.get()

问题

如何编写 C 函数以便可以 await 使用它?

我更喜欢 CPython 和 micropython 的答案,但我会接受仅 CPython 的答案。

注意:这个答案涵盖了 CPython 和 asyncio 框架。然而,这些概念应该适用于其他 Python 实现以及其他异步框架。

How do I write a C-function so I can await on it?

编写可等待其结果的 C 函数的最简单方法是让其 return 成为一个已创建的可等待对象,例如 asyncio.Future。在 returning Future 之前,代码必须安排未来的结果由某种异步机制设置。所有这些基于协程的方法都假定您的程序 运行 在某个知道如何安排协程的事件循环下。

但是 return 未来并不总是足够的 - 也许我们想定义一个具有任意数量的悬挂点的对象。返回一个 future 只会暂停一次(如果 returned future 没有完成),一旦 future 完成就恢复,就是这样。等同于包含多个 awaitasync def 的可等待对象不能通过 return 未来实现,它必须实现协程通常实现的协议。这有点像实现自定义 __next__ 的迭代器,可以用来代替生成器。

定义自定义可等待对象

要定义我们自己的可等待类型,我们可以求助于 PEP 492,其中 specifies 可以将哪些对象传递给 await。除了用 async def 定义的 Python 函数外,用户定义的类型可以通过定义 __await__ 特殊方法使对象可等待,Python/C 映射到 tp_as_async.am_await 部分PyTypeObject 结构。

这意味着在 Python/C 中,您必须执行以下操作:

  • 为扩展类型的 tp_as_async 字段指定一个非 NULL 值。
  • 有其am_await member point to a C function that accepts an instance of your type and returns an instance of another extension type that implements the iterator protocol, i.e. defines tp_iter (trivially defined as PyIter_Self) and tp_iternext.
  • 迭代器的 tp_iternext 必须推进协程的状态机。来自 tp_iternext 的每个非异常 return 对应于一个暂停,最后的 StopIteration 异常表示协程的最终 return。 return 值存储在 StopIterationvalue 属性 中。

要使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定在挂起后何时恢复。 asyncio 定义的大部分协程期望在 asyncio 事件循环下 运行,并在内部使用 asyncio.get_event_loop()(and/or 接受显式 loop 参数)来获取其服务。

协程示例

为了说明 Python/C 代码需要实现什么,让我们考虑表示为 Python async def 的简单协程,例如 asyncio.sleep() 的等价物:

async def my_sleep(n):
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    loop.call_later(n, future.set_result, None)
    await future
    # we get back here after the timeout has elapsed, and
    # immediately return

my_sleep 创建一个 Future,安排它在 n 秒内完成(其结果设置),并暂停自己直到将来完成。最后一部分使用 await,其中 await x 表示 "allow x to decide whether we will now suspend or keep executing"。一个不完整的 future 总是决定暂停,并且 asyncio Task 协程驱动程序特例产生 futures 无限期地暂停它们并将它们的完成连接到恢复任务。其他事件循环(curio 等)的暂停机制可能在细节上有所不同,但基本思想是相同的:await 是一个可选的暂停执行。

__await__() return 是一个发电机

要将其转换为 C,我们必须摆脱神奇的 async def 函数定义,以及 await 暂停点。删除 async def 相当简单:等效的普通函数只需要 return 一个实现 __await__:

的对象
def my_sleep(n):
    return _MySleep(n)

class _MySleep:
    def __init__(self, n):
        self.n = n

    def __await__(self):
        return _MySleepIter(self.n)

my_sleep()编辑的_MySleep对象return的__await__方法将被await运算符自动调用以转换awaitable 对象(传递给 await 的任何东西)到迭代器。该迭代器将用于询问等待的对象是选择暂停还是提供值。这很像 for o in x 语句如何调用 x.__iter__()iterable x 转换为具体的 iterator.

当returned迭代器选择暂停时,它只需要产生一个值。值的含义(如果有的话)将由协程驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从awaitreturn时,需要停止迭代。使用生成器作为方便的迭代器实现,_MySleepIter 看起来像这样:

def _MySleepIter(n):
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    loop.call_later(n, future.set_result, None)
    # yield from future.__await__()
    for x in future.__await__():
        yield x

由于 await x 映射到 yield from x.__await__(),我们的生成器必须耗尽由 future.__await__() 编辑的迭代器 return。如果未来不完整,由 Future.__await__ 编辑的迭代器 return 将产生,否则 return 未来的结果(我们在这里忽略,但 yield from 实际上提供)。

__await__() return 是一个自定义迭代器

在 C 中实现 my_sleep 的最后一个障碍是使用 _MySleepIter 的生成器。幸运的是,任何生成器都可以转换为有状态迭代器,其 __next__ 执行一段代码直到下一个 await 或 return。 __next__ 实现生成器代码的状态机版本,其中 yield 通过 returning 值表示,return 通过提高 StopIteration 表示。例如:

class _MySleepIter:
    def __init__(self, n):
        self.n = n
        self.state = 0

    def __iter__(self):  # an iterator has to define __iter__
        return self

    def __next__(self):
        if self.state == 0:
            loop = asyncio.get_event_loop()
            self.future = loop.create_future()
            loop.call_later(self.n, self.future.set_result, None)
            self.state = 1
        if self.state == 1:
            if not self.future.done():
                return next(iter(self.future))
            self.state = 2
        if self.state == 2:
            raise StopIteration
        raise AssertionError("invalid state")

翻译成 C

上面的输入相当多,但它有效,并且只使用可以用本机 Python/C 函数定义的结构。

实际上将两个 类 翻译成 C 非常简单,但超出了这个答案的范围。