从事件循环返回异步生成器数据可能吗?
Yielding asyncio generator data back from event loop possible?
我想使用 httpx 从协程中同时读取多个 HTTP 流请求,并将数据返回给我的非异步函数 运行 事件循环,而不仅仅是 return计算最终数据。
但是,如果我让我的异步函数 yield 而不是 return,我会收到抱怨,说 asyncio.as_completed()
和 loop.run_until_complete()
需要协程或 Future,而不是异步生成器。
所以我能让它工作的唯一方法是收集每个协程内的所有流数据,return请求完成后收集所有数据。然后收集所有协程结果,最后 return 将其发送到非异步调用函数。
这意味着 我必须将所有内容保存在内存中,并等到最慢的请求完成 才能获取所有数据,这破坏了流式传输 http 请求的全部意义。
有什么方法可以完成这样的事情吗?我当前的愚蠢实现如下所示:
def collect_data(urls):
"""Non-async function wishing it was a non-async generator"""
async def stream(async_client, url, payload):
data = []
async with async_client.stream("GET", url=url) as ar:
ar.raise_for_status()
async for line in ar.aiter_lines():
data.append(line)
# would like to yield each line here
return data
async def execute_tasks(urls):
all_data = []
async with httpx.AsyncClient() as async_client:
tasks = [stream(async_client, url) for url in urls]
for coroutine in asyncio.as_completed(tasks):
all_data += await coroutine
# would like to iterate and yield each line here
return all_events
try:
loop = asyncio.get_event_loop()
data = loop.run_until_complete(execute_tasks(urls=urls))
return data
# would like to iterate and yield the data here as it becomes available
finally:
loop.close()
编辑:我也尝试过使用asyncio.Queue
和trio
内存通道的一些解决方案,但是因为我只能从那些异步范围它并没有让我更接近解决方案
编辑 2:我想在非异步生成器中使用它的原因是我想在使用 Django Rest Framework 流的 Django 应用程序中使用它 API.
通常你应该让 collect_data
异步,并在整个过程中使用异步代码——这就是 asyncio 的设计用途。但如果出于某种原因这不可行,您 可以 通过应用一些粘合代码手动迭代异步迭代器:
def iter_over_async(ait, loop):
ait = ait.__aiter__()
async def get_next():
try:
obj = await ait.__anext__()
return False, obj
except StopAsyncIteration:
return True, None
while True:
done, obj = loop.run_until_complete(get_next())
if done:
break
yield obj
上面的工作方式是提供一个异步闭包,它使用 __anext__
magic method 从异步迭代器中不断检索值,并在对象到达时返回它们。这个异步闭包在普通同步生成器的循环中用 run_until_complete()
调用。 (闭包实际上 returns 一对完成指示器和实际对象,以避免通过 run_until_complete
传播 StopAsyncIteration
,这可能不受支持。)
有了这个,你可以让你的 execute_tasks
成为一个异步生成器(async def
和 yield
)并使用以下方法迭代它:
for chunk in iter_over_async(execute_tasks(urls), loop):
...
请注意,此方法与 asyncio.run
不兼容,可能会在以后引起问题。
我想使用 httpx 从协程中同时读取多个 HTTP 流请求,并将数据返回给我的非异步函数 运行 事件循环,而不仅仅是 return计算最终数据。
但是,如果我让我的异步函数 yield 而不是 return,我会收到抱怨,说 asyncio.as_completed()
和 loop.run_until_complete()
需要协程或 Future,而不是异步生成器。
所以我能让它工作的唯一方法是收集每个协程内的所有流数据,return请求完成后收集所有数据。然后收集所有协程结果,最后 return 将其发送到非异步调用函数。
这意味着 我必须将所有内容保存在内存中,并等到最慢的请求完成 才能获取所有数据,这破坏了流式传输 http 请求的全部意义。
有什么方法可以完成这样的事情吗?我当前的愚蠢实现如下所示:
def collect_data(urls):
"""Non-async function wishing it was a non-async generator"""
async def stream(async_client, url, payload):
data = []
async with async_client.stream("GET", url=url) as ar:
ar.raise_for_status()
async for line in ar.aiter_lines():
data.append(line)
# would like to yield each line here
return data
async def execute_tasks(urls):
all_data = []
async with httpx.AsyncClient() as async_client:
tasks = [stream(async_client, url) for url in urls]
for coroutine in asyncio.as_completed(tasks):
all_data += await coroutine
# would like to iterate and yield each line here
return all_events
try:
loop = asyncio.get_event_loop()
data = loop.run_until_complete(execute_tasks(urls=urls))
return data
# would like to iterate and yield the data here as it becomes available
finally:
loop.close()
编辑:我也尝试过使用asyncio.Queue
和trio
内存通道的一些解决方案,但是因为我只能从那些异步范围它并没有让我更接近解决方案
编辑 2:我想在非异步生成器中使用它的原因是我想在使用 Django Rest Framework 流的 Django 应用程序中使用它 API.
通常你应该让 collect_data
异步,并在整个过程中使用异步代码——这就是 asyncio 的设计用途。但如果出于某种原因这不可行,您 可以 通过应用一些粘合代码手动迭代异步迭代器:
def iter_over_async(ait, loop):
ait = ait.__aiter__()
async def get_next():
try:
obj = await ait.__anext__()
return False, obj
except StopAsyncIteration:
return True, None
while True:
done, obj = loop.run_until_complete(get_next())
if done:
break
yield obj
上面的工作方式是提供一个异步闭包,它使用 __anext__
magic method 从异步迭代器中不断检索值,并在对象到达时返回它们。这个异步闭包在普通同步生成器的循环中用 run_until_complete()
调用。 (闭包实际上 returns 一对完成指示器和实际对象,以避免通过 run_until_complete
传播 StopAsyncIteration
,这可能不受支持。)
有了这个,你可以让你的 execute_tasks
成为一个异步生成器(async def
和 yield
)并使用以下方法迭代它:
for chunk in iter_over_async(execute_tasks(urls), loop):
...
请注意,此方法与 asyncio.run
不兼容,可能会在以后引起问题。