使用 trio nursery 作为带有 FastAPI 的服务器发送事件的生成器?
Use trio nursery as a generator for Sever Sent Events with FastAPI?
我正在尝试使用 FastAPI 构建一个服务器发送事件端点,但我不确定我想要完成的事情是否可行,或者我将如何去做。
问题介绍
基本上假设我有一个 run_task(limit, task)
异步函数,它发送异步请求、进行交易或类似的事情。假设对于每个任务 run_task
可以 return 一些 JSON 数据。
我想 运行 异步执行多个任务(多个 run_task(limit, task)
),为此我正在使用 trio 和 nurseries,如下所示:
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task)
最后,我想 return 通过 FastAPI 端点
每个任务的结果
起初,我简单地创建了一个包含列表的对象,并将该对象(通过引用)传递给每个 run_task
,当任务完成时,我会将 JSON 数据推送为一个字典,一旦所有任务完成,return 通过端点的整个对象。
这行得通,但我发现它效率低下,发送请求的客户端需要等待所有任务完成才能显示数据,但是,有些任务可能会很慢,这意味着从其他任务中获取数据任务最终停滞不前。
我想完成的事情
每当任务完成时,我希望 API 直接 return 所述任务的数据(我之前会添加到对象中),以便客户端可以实时显示所述数据。
那时我发现了服务器发送的事件和 Web 套接字是什么。服务器发送的事件似乎是解决我问题的合适方法,因为我不需要双向通信。
因为 FastAPI 是建立在 Starlette 上的,我决定使用 sse-Starlette 来构建一个带有服务器发送事件的端点,为此我需要像这样构建一个端点
@router.get('/stream')
async def runTasks(
param1: str,
request: Request
):
event_generator = status_event_generator(request, param1)
return EventSourceResponse(event_generator)
实际问题
正如名称 status_event_generator
所暗示的那样,sse-starlette 需要 return 一个事件生成器,这就是我有点卡住的地方。我希望生成器在完成时生成任务的数据(以便客户端可以实时接收每个任务的数据),但是,任务在异步三重奏托儿所内,所以我不确定如何继续
根据 ,似乎(如果我理解正确的话)我不能只在 run_task(limit, task)
中输入收益并期望它起作用
使用 websockets 的解决方案
我决定最终使用 websockets 而不是 SSE,因为我意识到我需要将对象作为数据传递到我的端点,虽然 SEE 可以接受查询参数,但将对象作为查询参数处理实在是太麻烦了麻烦。
带有 FastAPI 的 websockets 是基于 starlette 的,并且非常容易使用,实现它们可以像这样完成上面的问题:
@router.websocket('/stream')
async def runTasks(
websocket: WebSocket
):
# Initialise websocket
await websocket.accept()
# Receive data
tasks = await websocket.receive_json()
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task, websocket)
到 return 数据,然后我们可以简单地在 run_task
中使用 await websocket.send_json()
(这是一个简化的示例,您最好希望使用您的托儿所)
SSE 解决方案
要回答最初的问题,感谢@user3840170 和 https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091,我们应该能够通过在更广泛范围内的某处开设一个包含遍历生成器的循环的托儿所来解决问题,并在生成器本身中使用该苗圃来生成后台任务。
我正在尝试使用 FastAPI 构建一个服务器发送事件端点,但我不确定我想要完成的事情是否可行,或者我将如何去做。
问题介绍
基本上假设我有一个 run_task(limit, task)
异步函数,它发送异步请求、进行交易或类似的事情。假设对于每个任务 run_task
可以 return 一些 JSON 数据。
我想 运行 异步执行多个任务(多个 run_task(limit, task)
),为此我正在使用 trio 和 nurseries,如下所示:
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task)
最后,我想 return 通过 FastAPI 端点
每个任务的结果起初,我简单地创建了一个包含列表的对象,并将该对象(通过引用)传递给每个 run_task
,当任务完成时,我会将 JSON 数据推送为一个字典,一旦所有任务完成,return 通过端点的整个对象。
这行得通,但我发现它效率低下,发送请求的客户端需要等待所有任务完成才能显示数据,但是,有些任务可能会很慢,这意味着从其他任务中获取数据任务最终停滞不前。
我想完成的事情
每当任务完成时,我希望 API 直接 return 所述任务的数据(我之前会添加到对象中),以便客户端可以实时显示所述数据。
那时我发现了服务器发送的事件和 Web 套接字是什么。服务器发送的事件似乎是解决我问题的合适方法,因为我不需要双向通信。
因为 FastAPI 是建立在 Starlette 上的,我决定使用 sse-Starlette 来构建一个带有服务器发送事件的端点,为此我需要像这样构建一个端点
@router.get('/stream')
async def runTasks(
param1: str,
request: Request
):
event_generator = status_event_generator(request, param1)
return EventSourceResponse(event_generator)
实际问题
正如名称 status_event_generator
所暗示的那样,sse-starlette 需要 return 一个事件生成器,这就是我有点卡住的地方。我希望生成器在完成时生成任务的数据(以便客户端可以实时接收每个任务的数据),但是,任务在异步三重奏托儿所内,所以我不确定如何继续
根据 run_task(limit, task)
中输入收益并期望它起作用
使用 websockets 的解决方案
我决定最终使用 websockets 而不是 SSE,因为我意识到我需要将对象作为数据传递到我的端点,虽然 SEE 可以接受查询参数,但将对象作为查询参数处理实在是太麻烦了麻烦。
带有 FastAPI 的 websockets 是基于 starlette 的,并且非常容易使用,实现它们可以像这样完成上面的问题:
@router.websocket('/stream')
async def runTasks(
websocket: WebSocket
):
# Initialise websocket
await websocket.accept()
# Receive data
tasks = await websocket.receive_json()
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task, websocket)
到 return 数据,然后我们可以简单地在 run_task
中使用 await websocket.send_json()
(这是一个简化的示例,您最好希望使用您的托儿所)
SSE 解决方案
要回答最初的问题,感谢@user3840170 和 https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091,我们应该能够通过在更广泛范围内的某处开设一个包含遍历生成器的循环的托儿所来解决问题,并在生成器本身中使用该苗圃来生成后台任务。