如何在调用 `async for in` 后获取异步生成器的下一次迭代
How to get next itereration of async generator after calling `async for in`
使用 FastAPI 我正在尝试检测 StreamingResponse 是否已完全被客户端使用或是否已被取消。
我有以下示例应用程序:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def ainfinite_generator():
while True:
yield b"some fake data "
await asyncio.sleep(.001)
async def astreamer(generator):
try:
async for data in generator:
yield data
except Exception as e:
# this isn't triggered by a cancelled request
print(e)
finally:
# this always throws a StopAsyncIteration exception
# no matter whether the generator was consumed or not
leftover = await generator.__anext__()
if leftover:
print("we didn't finish")
else:
print("we finished")
@app.get("/")
async def infinite_stream():
return StreamingResponse(astreamer(ainfinite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
astreamer
中的第一个 async for in generator
似乎“消耗”了异步生成器。在该循环之后,进一步尝试获得下一次迭代失败并出现 StopAsyncIteration
异常,即使生成器是如上定义的“无限”。
我浏览了PEP-525 and the only thing I am seeing is that if an exception is thrown into the generator it will cause any further attempts to read from the generator to throw that StopAsyncIteration exception, but I don't see where that would be happening. At least, I'm not seeing that in Starlette's StreamingResponse class(它似乎与“内容”没有太大关系)。生成器在执行 async for in gen
后不会“释放”吗?
下面的代码展示了如何在协同程序(在我的例子中是异步生成器)上观察取消。如评论中所述,如果异步生成器被取消,它会向生成器注入异常,从那时起,任何尝试获取生成器中的下一个项目都将引发 StopAsyncIteration
异常。参见 PEP 525。要确定异步生成器是否被取消,只需在 asyncio.CancelledError
异常(源自 BaseException
)上 try/except。
这里还有代码展示了如何处理普通的生成器,它们更宽容一些。如果您保持相同的 try/except 流程,则在它们被取消时会引发 GeneratorExit
异常。
棘手的部分是这些异常中的大多数都源自 BaseException
class,这与我预期的 StopIteration
异常不同,后者源自 Exception
class.
而且,顺便说一句,实际取消发生在 starlette。
import asyncio
import time
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
def infinite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
while True:
yield b"some fake data "
def finite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
x = 0
while x < 10000:
yield f"{x}"
x += 1
async def astreamer(generator):
try:
# if it was an async generator we'd do:
# "async for data in generator:"
# (there is no yield from async_generator)
for i in generator:
yield i
await asyncio.sleep(.001)
except asyncio.CancelledError as e:
print('cancelled')
def streamer(generator):
try:
# note: normally we would do "yield from generator"
# but that won't work with next(generator) in the finally statement
for i in generator:
yield i
time.sleep(.001)
except GeneratorExit:
print("cancelled")
finally:
# showing that we can check here to see if all data was consumed
# the except statement above effectively does the same thing
try:
next(generator)
print("we didn't finish")
return
except StopIteration:
print("we finished")
@app.get("/infinite")
async def infinite_stream():
return StreamingResponse(streamer(infinite_generator()))
@app.get("/finite")
async def finite_stream():
return StreamingResponse(streamer(finite_generator()))
@app.get("/ainfinite")
async def infinite_stream():
return StreamingResponse(astreamer(infinite_generator()))
@app.get("/afinite")
async def finite_stream():
return StreamingResponse(astreamer(finite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
使用 FastAPI 我正在尝试检测 StreamingResponse 是否已完全被客户端使用或是否已被取消。
我有以下示例应用程序:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def ainfinite_generator():
while True:
yield b"some fake data "
await asyncio.sleep(.001)
async def astreamer(generator):
try:
async for data in generator:
yield data
except Exception as e:
# this isn't triggered by a cancelled request
print(e)
finally:
# this always throws a StopAsyncIteration exception
# no matter whether the generator was consumed or not
leftover = await generator.__anext__()
if leftover:
print("we didn't finish")
else:
print("we finished")
@app.get("/")
async def infinite_stream():
return StreamingResponse(astreamer(ainfinite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
astreamer
中的第一个 async for in generator
似乎“消耗”了异步生成器。在该循环之后,进一步尝试获得下一次迭代失败并出现 StopAsyncIteration
异常,即使生成器是如上定义的“无限”。
我浏览了PEP-525 and the only thing I am seeing is that if an exception is thrown into the generator it will cause any further attempts to read from the generator to throw that StopAsyncIteration exception, but I don't see where that would be happening. At least, I'm not seeing that in Starlette's StreamingResponse class(它似乎与“内容”没有太大关系)。生成器在执行 async for in gen
后不会“释放”吗?
下面的代码展示了如何在协同程序(在我的例子中是异步生成器)上观察取消。如评论中所述,如果异步生成器被取消,它会向生成器注入异常,从那时起,任何尝试获取生成器中的下一个项目都将引发 StopAsyncIteration
异常。参见 PEP 525。要确定异步生成器是否被取消,只需在 asyncio.CancelledError
异常(源自 BaseException
)上 try/except。
这里还有代码展示了如何处理普通的生成器,它们更宽容一些。如果您保持相同的 try/except 流程,则在它们被取消时会引发 GeneratorExit
异常。
棘手的部分是这些异常中的大多数都源自 BaseException
class,这与我预期的 StopIteration
异常不同,后者源自 Exception
class.
而且,顺便说一句,实际取消发生在 starlette。
import asyncio
import time
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
def infinite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
while True:
yield b"some fake data "
def finite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
x = 0
while x < 10000:
yield f"{x}"
x += 1
async def astreamer(generator):
try:
# if it was an async generator we'd do:
# "async for data in generator:"
# (there is no yield from async_generator)
for i in generator:
yield i
await asyncio.sleep(.001)
except asyncio.CancelledError as e:
print('cancelled')
def streamer(generator):
try:
# note: normally we would do "yield from generator"
# but that won't work with next(generator) in the finally statement
for i in generator:
yield i
time.sleep(.001)
except GeneratorExit:
print("cancelled")
finally:
# showing that we can check here to see if all data was consumed
# the except statement above effectively does the same thing
try:
next(generator)
print("we didn't finish")
return
except StopIteration:
print("we finished")
@app.get("/infinite")
async def infinite_stream():
return StreamingResponse(streamer(infinite_generator()))
@app.get("/finite")
async def finite_stream():
return StreamingResponse(streamer(finite_generator()))
@app.get("/ainfinite")
async def infinite_stream():
return StreamingResponse(astreamer(infinite_generator()))
@app.get("/afinite")
async def finite_stream():
return StreamingResponse(astreamer(finite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)