asyncio 中的同步生成器
Synchronous generator in asyncio
我有以下场景:
- 我有一个阻塞式同步发电机
- 我有一个非阻塞的异步函数
我想 运行 阻塞生成器(在 ThreadPool
中执行)和事件循环中的 async
函数。我该如何实现?
以下函数只是打印生成器的输出,而不是 sleep
函数的输出。
谢谢!
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import asyncio
import time
def f():
while True:
r = np.random.randint(0, 3)
time.sleep(r)
yield r
async def gen():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor()
gen = await loop.run_in_executor(executor, f)
for item in gen:
print(item)
print('Inside generator')
async def sleep():
while True:
await asyncio.sleep(1)
print('Inside async sleep')
async def combine():
await asyncio.gather(sleep(), gen())
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(combine())
if __name__ == '__main__':
main()
run_in_executor
不适用于生成器,因为它是为阻塞函数而设计的。虽然生成器是一个有效的函数,但它在被调用时立即 returns,提供一个对象,调用者应该通过重复调用 next
来耗尽该对象。 (这就是 Python 的 for
循环在幕后所做的事情。)要从异步代码中使用阻塞生成器,您有两种选择:
- 将迭代的每个步骤(对
next
的每个单独调用)包装在对run_in_executor
或 的单独调用中
- 在单独的线程中启动
for
循环并使用队列将对象传输到异步使用者。
任何一种方法都可以抽象为一个接受迭代器的函数和 returns 一个等效的异步迭代器。这是第二种方法的实现:
import asyncio, threading
def async_wrap_iter(it):
"""Wrap blocking iterator into an asynchronous one"""
loop = asyncio.get_event_loop()
q = asyncio.Queue(1)
exception = None
_END = object()
async def yield_queue_items():
while True:
next_item = await q.get()
if next_item is _END:
break
yield next_item
if exception is not None:
# the iterator has raised, propagate the exception
raise exception
def iter_to_queue():
nonlocal exception
try:
for item in it:
# This runs outside the event loop thread, so we
# must use thread-safe API to talk to the queue.
asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
except Exception as e:
exception = e
finally:
asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()
threading.Thread(target=iter_to_queue).start()
return yield_queue_items()
可以用一个使用time.time()
阻塞的普通同步迭代器和一个异步心跳函数来测试,证明事件循环是运行:
# async_wrap_iter definition as above
import time
def test_iter():
for i in range(5):
yield i
time.sleep(1)
async def test():
ait = async_wrap_iter(test_iter())
async for i in ait:
print(i)
async def heartbeat():
while True:
print('alive')
await asyncio.sleep(.1)
async def main():
asyncio.create_task(heartbeat())
await test()
asyncio.run(main())
我有以下场景:
- 我有一个阻塞式同步发电机
- 我有一个非阻塞的异步函数
我想 运行 阻塞生成器(在 ThreadPool
中执行)和事件循环中的 async
函数。我该如何实现?
以下函数只是打印生成器的输出,而不是 sleep
函数的输出。
谢谢!
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import asyncio
import time
def f():
while True:
r = np.random.randint(0, 3)
time.sleep(r)
yield r
async def gen():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor()
gen = await loop.run_in_executor(executor, f)
for item in gen:
print(item)
print('Inside generator')
async def sleep():
while True:
await asyncio.sleep(1)
print('Inside async sleep')
async def combine():
await asyncio.gather(sleep(), gen())
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(combine())
if __name__ == '__main__':
main()
run_in_executor
不适用于生成器,因为它是为阻塞函数而设计的。虽然生成器是一个有效的函数,但它在被调用时立即 returns,提供一个对象,调用者应该通过重复调用 next
来耗尽该对象。 (这就是 Python 的 for
循环在幕后所做的事情。)要从异步代码中使用阻塞生成器,您有两种选择:
- 将迭代的每个步骤(对
next
的每个单独调用)包装在对run_in_executor
或 的单独调用中
- 在单独的线程中启动
for
循环并使用队列将对象传输到异步使用者。
任何一种方法都可以抽象为一个接受迭代器的函数和 returns 一个等效的异步迭代器。这是第二种方法的实现:
import asyncio, threading
def async_wrap_iter(it):
"""Wrap blocking iterator into an asynchronous one"""
loop = asyncio.get_event_loop()
q = asyncio.Queue(1)
exception = None
_END = object()
async def yield_queue_items():
while True:
next_item = await q.get()
if next_item is _END:
break
yield next_item
if exception is not None:
# the iterator has raised, propagate the exception
raise exception
def iter_to_queue():
nonlocal exception
try:
for item in it:
# This runs outside the event loop thread, so we
# must use thread-safe API to talk to the queue.
asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
except Exception as e:
exception = e
finally:
asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()
threading.Thread(target=iter_to_queue).start()
return yield_queue_items()
可以用一个使用time.time()
阻塞的普通同步迭代器和一个异步心跳函数来测试,证明事件循环是运行:
# async_wrap_iter definition as above
import time
def test_iter():
for i in range(5):
yield i
time.sleep(1)
async def test():
ait = async_wrap_iter(test_iter())
async for i in ait:
print(i)
async def heartbeat():
while True:
print('alive')
await asyncio.sleep(.1)
async def main():
asyncio.create_task(heartbeat())
await test()
asyncio.run(main())