Python3.7 asyncio 启动网络服务器 (FastAPI) 和 aio_pika 消费者
Python3.7 asyncio start webserver (FastAPI) and aio_pika consumer
在我的项目中,我尝试启动 REST API(使用 FastAPI 和 运行 使用 Hypercorn 构建),另外我还想在启动时启动 RabbitMQ Consumer( aio_pika):
Aio Pika 提供可靠的连接,可在失败时自动重新连接。如果我 运行 下面的代码与 hypercorn app:app
消费者和其余接口正确启动,但从 aio_pika 重新连接不再起作用。我如何在两个不同的进程(或线程?)中归档生产稳定的 RabbitMQ Consumer 和 RestAPI。我的 python 版本是 3.7,请注意我实际上是 Java 和 Go 开发人员,以防我的方法不是 Python 方式:-)
@app.on_event("startup")
def startup():
loop = asyncio.new_event_loop()
asyncio.ensure_future(main(loop))
@app.get("/")
def read_root():
return {"Hello": "World"}
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
async with connection:
queue_name = "test_queue"
# Creating channel
channel = await connection.channel() # type: aio_pika.Channel
# Declaring queue
queue = await channel.declare_queue(
queue_name,
auto_delete=True
) # type: aio_pika.Queue
async with queue.iterator() as queue_iter:
# Cancel consuming after __aexit__
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
在@pgjones 的帮助下,我设法将消费开始更改为:
@app.on_event("startup")
def startup():
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
并用 asyncio.ensure_future
启动 job
并将当前事件循环作为参数传递,这解决了问题。
如果有人有 different/better 方法会很有趣
谢谢!
在我的项目中,我尝试启动 REST API(使用 FastAPI 和 运行 使用 Hypercorn 构建),另外我还想在启动时启动 RabbitMQ Consumer( aio_pika):
Aio Pika 提供可靠的连接,可在失败时自动重新连接。如果我 运行 下面的代码与 hypercorn app:app
消费者和其余接口正确启动,但从 aio_pika 重新连接不再起作用。我如何在两个不同的进程(或线程?)中归档生产稳定的 RabbitMQ Consumer 和 RestAPI。我的 python 版本是 3.7,请注意我实际上是 Java 和 Go 开发人员,以防我的方法不是 Python 方式:-)
@app.on_event("startup")
def startup():
loop = asyncio.new_event_loop()
asyncio.ensure_future(main(loop))
@app.get("/")
def read_root():
return {"Hello": "World"}
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
async with connection:
queue_name = "test_queue"
# Creating channel
channel = await connection.channel() # type: aio_pika.Channel
# Declaring queue
queue = await channel.declare_queue(
queue_name,
auto_delete=True
) # type: aio_pika.Queue
async with queue.iterator() as queue_iter:
# Cancel consuming after __aexit__
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
在@pgjones 的帮助下,我设法将消费开始更改为:
@app.on_event("startup")
def startup():
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
并用 asyncio.ensure_future
启动 job
并将当前事件循环作为参数传递,这解决了问题。
如果有人有 different/better 方法会很有趣 谢谢!