如何在调用 `async for in` 后获取异步生成器的下一次迭代

How to get next itereration of async generator after calling `async for in`

使用 FastAPI 我正在尝试检测 StreamingResponse 是否已完全被客户端使用或是否已被取消。

我有以下示例应用程序:

import asyncio

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


async def ainfinite_generator():
    while True:
        yield b"some fake data "
        await asyncio.sleep(.001)


async def astreamer(generator):
    try:
        async for data in generator:
            yield data
    except Exception as e:
        # this isn't triggered by a cancelled request
        print(e)
    finally:
        # this always throws a StopAsyncIteration exception
        # no matter whether the generator was consumed or not
        leftover = await generator.__anext__()
        if leftover:
            print("we didn't finish")
        else:
            print("we finished")


@app.get("/")
async def infinite_stream():
    return StreamingResponse(astreamer(ainfinite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

astreamer 中的第一个 async for in generator 似乎“消耗”了异步生成器。在该循环之后,进一步尝试获得下一次迭代失败并出现 StopAsyncIteration 异常,即使生成器是如上定义的“无限”。

我浏览了PEP-525 and the only thing I am seeing is that if an exception is thrown into the generator it will cause any further attempts to read from the generator to throw that StopAsyncIteration exception, but I don't see where that would be happening. At least, I'm not seeing that in Starlette's StreamingResponse class(它似乎与“内容”没有太大关系)。生成器在执行 async for in gen 后不会“释放”吗?

下面的代码展示了如何在协同程序(在我的例子中是异步生成器)上观察取消。如评论中所述,如果异步生成器被取消,它会向生成器注入异常,从那时起,任何尝试获取生成器中的下一个项目都将引发 StopAsyncIteration 异常。参见 PEP 525。要确定异步生成器是否被取消,只需在 asyncio.CancelledError 异常(源自 BaseException)上 try/except。

这里还有代码展示了如何处理普通的生成器,它们更宽容一些。如果您保持相同的 try/except 流程,则在它们被取消时会引发 GeneratorExit 异常。

棘手的部分是这些异常中的大多数都源自 BaseException class,这与我预期的 StopIteration 异常不同,后者源自 Exception class.

而且,顺便说一句,实际取消发生在 starlette

import asyncio
import time

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


def infinite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    while True:
        yield b"some fake data "


def finite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    x = 0
    while x < 10000:
        yield f"{x}"
        x += 1


async def astreamer(generator):
    try:
        # if it was an async generator we'd do:
        # "async for data in generator:"
        # (there is no yield from async_generator)
        for i in generator:
            yield i
            await asyncio.sleep(.001)

    except asyncio.CancelledError as e:
        print('cancelled')


def streamer(generator):
    try:
        # note: normally we would do "yield from generator"
        # but that won't work with next(generator) in the finally statement
        for i in generator:
            yield i
            time.sleep(.001)

    except GeneratorExit:
        print("cancelled")
    finally:
        # showing that we can check here to see if all data was consumed
        # the except statement above effectively does the same thing
        try:
            next(generator)
            print("we didn't finish")
            return
        except StopIteration:
            print("we finished")


@app.get("/infinite")
async def infinite_stream():
    return StreamingResponse(streamer(infinite_generator()))


@app.get("/finite")
async def finite_stream():
    return StreamingResponse(streamer(finite_generator()))


@app.get("/ainfinite")
async def infinite_stream():
    return StreamingResponse(astreamer(infinite_generator()))


@app.get("/afinite")
async def finite_stream():
    return StreamingResponse(astreamer(finite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)