使用 FastAPI 与异步循环交互
Use FastAPI to interact with async loop
我是 运行'workers' 的协程,其工作是等待 5 秒,从 asyncio.Queue() 获取值并不断打印出来。
q = asyncio.Queue()
def worker():
while True:
await asyncio.sleep(5)
i = await q.get()
print(i)
q.task_done()
async def main(q):
workers = [asyncio.create_task(worker()) for n in range(10)]
await asyncio.gather(*workers)
if __name__ == "__main__":
asyncio.run(main())
我希望能够使用 FastAPI 通过 http 请求与队列进行交互。例如 POST 请求将 'put' 队列中的项目供工作人员打印。
我不确定如何 运行 worker 的协程与 FastAPI 同时实现这种效果。我相信 Uvicorn 有自己的事件循环,我尝试使用 asyncio 方法没有成功。
我认为路由器看起来像这样。
@app.post("/")
async def put_queue(data:str):
return q.put(data)
我希望有这样的效果:
await asyncio.gather(main(),{FastApi() app run})
一个选项是添加一个任务,将您的主协程包装在 a on startup event
中
import asyncio
@app.on_event("startup")
async def startup_event():
asyncio.create_task(main())
这会在应用程序完全启动之前安排您的主协程。
这里重要的是你不要等待创建的任务,因为它基本上会阻塞 startup_event 永远
我是 运行'workers' 的协程,其工作是等待 5 秒,从 asyncio.Queue() 获取值并不断打印出来。
q = asyncio.Queue()
def worker():
while True:
await asyncio.sleep(5)
i = await q.get()
print(i)
q.task_done()
async def main(q):
workers = [asyncio.create_task(worker()) for n in range(10)]
await asyncio.gather(*workers)
if __name__ == "__main__":
asyncio.run(main())
我希望能够使用 FastAPI 通过 http 请求与队列进行交互。例如 POST 请求将 'put' 队列中的项目供工作人员打印。
我不确定如何 运行 worker 的协程与 FastAPI 同时实现这种效果。我相信 Uvicorn 有自己的事件循环,我尝试使用 asyncio 方法没有成功。
我认为路由器看起来像这样。
@app.post("/")
async def put_queue(data:str):
return q.put(data)
我希望有这样的效果:
await asyncio.gather(main(),{FastApi() app run})
一个选项是添加一个任务,将您的主协程包装在 a on startup event
中import asyncio
@app.on_event("startup")
async def startup_event():
asyncio.create_task(main())
这会在应用程序完全启动之前安排您的主协程。
这里重要的是你不要等待创建的任务,因为它基本上会阻塞 startup_event 永远