在 asyncio/Quart 中安全地等待两个事件源

Safely awaiting two event sources in asyncio/Quart

Quart 是一个 Python 网络框架,它在 Python 的 asyncio 协程系统之上重新实现了 Flask API。在我的特殊情况下,我有一个 Quart websocket 端点,它应该不仅有一个传入事件源,还有 两个 应该继续异步循环的可能事件源。

具有一个事件源的示例:

from quart import Quart, websocket
app = Quart(__name__)

@app.websocket("/echo")
def echo():
    while True:
        incoming_message = await websocket.receive()
        await websocket.send(incoming_message)

取自https://pgjones.gitlab.io/quart/

这个例子有一个来源:传入消息流。但是,如果我有两个可能的来源,那么正确的模式是什么,一个是 await websocket.receive() 而另一个是 await system.get_next_external_notification() .

如果他们中的任何一个到达,我想发送一个 websocket 消息。
我想我必须使用 asyncio.wait(..., return_when=FIRST_COMPLETED),但我如何确保我没有遗漏任何数据(即对于 websocket.receive()system.get_next_external_notification() 几乎完全相同的竞争条件时间) ?在这种情况下,正确的模式是什么?

您可以使用的一个想法是将来自不同来源的事件连接在一起的队列,然后使用异步函数在后台侦听该队列的请求。这样的事情可能会让你开始:

import asyncio
from quart import Quart, websocket
app = Quart(__name__)

@app.before_serving
async def startup():
    print(f'starting')
    app.q = asyncio.Queue(1)
    asyncio.ensure_future(listener(app.q))

async def listener(q):
    while True:
        returnq, msg = await q.get()
        print(msg)
        await returnq.put(f'hi: {msg}')

@app.route("/echo/<message>")
async def echo(message):
    while True:
        returnq = asyncio.Queue(1)
        await app.q.put((returnq, message))
        response = await returnq.get()
        return response

@app.route("/echo2/<message>")
async def echo2(message):
    while True:
        returnq = asyncio.Queue(1)
        await app.q.put((returnq, message))
        response = await returnq.get()
        return response