将带有回调的 Python 函数转换为可等待的 asyncio

Converting a Python function with a callback to an asyncio awaitable

我想在异步上下文中使用 PyAudio 库,但该库的主要入口点只有一个基于回调的 API:

import pyaudio

def callback(in_data, frame_count, time_info, status):
    # Do something with data

pa = pyaudio.PyAudio()
self.stream = self.pa.open(
    stream_callback=callback
)

我希望如何使用它是这样的:

pa = SOME_ASYNC_COROUTINE()
async def listen():
    async for block in pa:
        # Do something with block

问题是,我不确定如何将此回调语法转换为在回调触发时完成的未来。在 JavaScript 中我会使用 promise.promisify(),但 Python 似乎没有类似的东西。

您可能想要使用 Future

class asyncio.Future(*, loop=None)¶

A Future represents an eventual result of an asynchronous operation. Not thread-safe.

Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled.

Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports) to interoperate with high-level async/await code.

The rule of thumb is to never expose Future objects in user-facing APIs, and the recommended way to create a Future object is to call loop.create_future(). This way alternative event loop implementations can inject their own optimized implementations of a Future object.

一个愚蠢的例子:

def my_func(loop):
    fut = loop.create_future()
    pa.open(
        stream_callback=lambda *a, **kw: fut.set_result([a, kw])
    )
    return fut


async def main(loop):
    result = await my_func(loop)  # returns a list with args and kwargs 

我假设 pa.open 在线程或子进程中运行。如果没有,您可能还需要使用 asyncio.loop.run_in_executor

包装对 open 的调用

promisify 的等价物不适用于此用例,原因有二:

  • PyAudio 的异步 API 不使用异步事件循环 - 文档指定回调是从后台线程调用的。这需要采取预防措施才能与 asyncio 正确通信。
  • 回调不能由单个未来建模,因为它会被多次调用,而未来只能有一个结果。相反,它必须转换为异步迭代器,就像您的示例代码中所示。

这是一种可能的实现方式:

def make_iter():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    def put(*args):
        loop.call_soon_threadsafe(queue.put_nowait, args)
    async def get():
        while True:
            yield await queue.get()
    return get(), put

make_iter return。 returned 对象持有 属性,调用回调会导致迭代器产生它的下一个值(传递给回调的参数)。可以调用回调以从任意线程调用,因此可以安全地传递给 pyaudio.open,而异步迭代器应该在异步协程中提供给 async for,在等待下一个值:

async def main():
    stream_get, stream_put = make_iter()
    stream = pa.open(stream_callback=stream_put)
    stream.start_stream()
    async for in_data, frame_count, time_info, status in stream_get:
        # ...

asyncio.get_event_loop().run_until_complete(main())

请注意,根据 documentation,回调还必须 return 一个有意义的值、一个帧元组和一个布尔标志。这可以通过更改 fill 函数以从 asyncio 端接收数据来合并到设计中。不包括实现,因为如果不了解领域,它可能没有多大意义。