将带有回调的 Python 函数转换为可等待的 asyncio
Converting a Python function with a callback to an asyncio awaitable
我想在异步上下文中使用 PyAudio
库,但该库的主要入口点只有一个基于回调的 API:
import pyaudio
def callback(in_data, frame_count, time_info, status):
# Do something with data
pa = pyaudio.PyAudio()
self.stream = self.pa.open(
stream_callback=callback
)
我希望如何使用它是这样的:
pa = SOME_ASYNC_COROUTINE()
async def listen():
async for block in pa:
# Do something with block
问题是,我不确定如何将此回调语法转换为在回调触发时完成的未来。在 JavaScript 中我会使用 promise.promisify()
,但 Python 似乎没有类似的东西。
您可能想要使用 Future
class asyncio.Future(*, loop=None)¶
A Future represents an eventual result of an asynchronous operation. Not thread-safe.
Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled.
Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports) to interoperate with high-level async/await code.
The rule of thumb is to never expose Future objects in user-facing APIs, and the recommended way to create a Future object is to call loop.create_future(). This way alternative event loop implementations can inject their own optimized implementations of a Future object.
一个愚蠢的例子:
def my_func(loop):
fut = loop.create_future()
pa.open(
stream_callback=lambda *a, **kw: fut.set_result([a, kw])
)
return fut
async def main(loop):
result = await my_func(loop) # returns a list with args and kwargs
我假设 pa.open
在线程或子进程中运行。如果没有,您可能还需要使用 asyncio.loop.run_in_executor
包装对 open
的调用
promisify
的等价物不适用于此用例,原因有二:
- PyAudio 的异步 API 不使用异步事件循环 - 文档指定回调是从后台线程调用的。这需要采取预防措施才能与 asyncio 正确通信。
- 回调不能由单个未来建模,因为它会被多次调用,而未来只能有一个结果。相反,它必须转换为异步迭代器,就像您的示例代码中所示。
这是一种可能的实现方式:
def make_iter():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def put(*args):
loop.call_soon_threadsafe(queue.put_nowait, args)
async def get():
while True:
yield await queue.get()
return get(), put
make_iter
return 的 对 。 returned 对象持有 属性,调用回调会导致迭代器产生它的下一个值(传递给回调的参数)。可以调用回调以从任意线程调用,因此可以安全地传递给 pyaudio.open
,而异步迭代器应该在异步协程中提供给 async for
,在等待下一个值:
async def main():
stream_get, stream_put = make_iter()
stream = pa.open(stream_callback=stream_put)
stream.start_stream()
async for in_data, frame_count, time_info, status in stream_get:
# ...
asyncio.get_event_loop().run_until_complete(main())
请注意,根据 documentation,回调还必须 return 一个有意义的值、一个帧元组和一个布尔标志。这可以通过更改 fill
函数以从 asyncio 端接收数据来合并到设计中。不包括实现,因为如果不了解领域,它可能没有多大意义。
我想在异步上下文中使用 PyAudio
库,但该库的主要入口点只有一个基于回调的 API:
import pyaudio
def callback(in_data, frame_count, time_info, status):
# Do something with data
pa = pyaudio.PyAudio()
self.stream = self.pa.open(
stream_callback=callback
)
我希望如何使用它是这样的:
pa = SOME_ASYNC_COROUTINE()
async def listen():
async for block in pa:
# Do something with block
问题是,我不确定如何将此回调语法转换为在回调触发时完成的未来。在 JavaScript 中我会使用 promise.promisify()
,但 Python 似乎没有类似的东西。
您可能想要使用 Future
class asyncio.Future(*, loop=None)¶
A Future represents an eventual result of an asynchronous operation. Not thread-safe.
Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled.
Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports) to interoperate with high-level async/await code.
The rule of thumb is to never expose Future objects in user-facing APIs, and the recommended way to create a Future object is to call loop.create_future(). This way alternative event loop implementations can inject their own optimized implementations of a Future object.
一个愚蠢的例子:
def my_func(loop):
fut = loop.create_future()
pa.open(
stream_callback=lambda *a, **kw: fut.set_result([a, kw])
)
return fut
async def main(loop):
result = await my_func(loop) # returns a list with args and kwargs
我假设 pa.open
在线程或子进程中运行。如果没有,您可能还需要使用 asyncio.loop.run_in_executor
open
的调用
promisify
的等价物不适用于此用例,原因有二:
- PyAudio 的异步 API 不使用异步事件循环 - 文档指定回调是从后台线程调用的。这需要采取预防措施才能与 asyncio 正确通信。
- 回调不能由单个未来建模,因为它会被多次调用,而未来只能有一个结果。相反,它必须转换为异步迭代器,就像您的示例代码中所示。
这是一种可能的实现方式:
def make_iter():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def put(*args):
loop.call_soon_threadsafe(queue.put_nowait, args)
async def get():
while True:
yield await queue.get()
return get(), put
make_iter
returnpyaudio.open
,而异步迭代器应该在异步协程中提供给 async for
,在等待下一个值:
async def main():
stream_get, stream_put = make_iter()
stream = pa.open(stream_callback=stream_put)
stream.start_stream()
async for in_data, frame_count, time_info, status in stream_get:
# ...
asyncio.get_event_loop().run_until_complete(main())
请注意,根据 documentation,回调还必须 return 一个有意义的值、一个帧元组和一个布尔标志。这可以通过更改 fill
函数以从 asyncio 端接收数据来合并到设计中。不包括实现,因为如果不了解领域,它可能没有多大意义。