使用 FastAPI 与异步循环交互

Use FastAPI to interact with async loop

我是 运行'workers' 的协程,其工作是等待 5 秒,从 asyncio.Queue() 获取值并不断打印出来。

q = asyncio.Queue()

def worker():
    while True:
        await asyncio.sleep(5)
        i = await q.get()
        print(i)
        q.task_done()

async def main(q):
    workers = [asyncio.create_task(worker()) for n in range(10)]
    await asyncio.gather(*workers)


if __name__ == "__main__":
    asyncio.run(main())

我希望能够使用 FastAPI 通过 http 请求与队列进行交互。例如 POST 请求将 'put' 队列中的项目供工作人员打印。

我不确定如何 运行 worker 的协程与 FastAPI 同时实现这种效果。我相信 Uvicorn 有自己的事件循环,我尝试使用 asyncio 方法没有成功。

我认为路由器看起来像这样。

@app.post("/")
async def put_queue(data:str):
    return q.put(data)

我希望有这样的效果:

await asyncio.gather(main(),{FastApi() app run})

一个选项是添加一个任务,将您的主协程包装在 a on startup event

import asyncio
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(main())

这会在应用程序完全启动之前安排您的主协程。

这里重要的是你不要等待创建的任务,因为它基本上会阻塞 startup_event 永远