在 asyncio/Quart 中安全地等待两个事件源
Safely awaiting two event sources in asyncio/Quart
Quart 是一个 Python 网络框架,它在 Python 的 asyncio
协程系统之上重新实现了 Flask API。在我的特殊情况下,我有一个 Quart websocket 端点,它应该不仅有一个传入事件源,还有 两个 应该继续异步循环的可能事件源。
具有一个事件源的示例:
from quart import Quart, websocket
app = Quart(__name__)
@app.websocket("/echo")
def echo():
while True:
incoming_message = await websocket.receive()
await websocket.send(incoming_message)
取自https://pgjones.gitlab.io/quart/
这个例子有一个来源:传入消息流。但是,如果我有两个可能的来源,那么正确的模式是什么,一个是 await websocket.receive()
而另一个是 await system.get_next_external_notification()
.
如果他们中的任何一个到达,我想发送一个 websocket 消息。
我想我必须使用 asyncio.wait(..., return_when=FIRST_COMPLETED)
,但我如何确保我没有遗漏任何数据(即对于 websocket.receive()
和 system.get_next_external_notification()
几乎完全相同的竞争条件时间) ?在这种情况下,正确的模式是什么?
您可以使用的一个想法是将来自不同来源的事件连接在一起的队列,然后使用异步函数在后台侦听该队列的请求。这样的事情可能会让你开始:
import asyncio
from quart import Quart, websocket
app = Quart(__name__)
@app.before_serving
async def startup():
print(f'starting')
app.q = asyncio.Queue(1)
asyncio.ensure_future(listener(app.q))
async def listener(q):
while True:
returnq, msg = await q.get()
print(msg)
await returnq.put(f'hi: {msg}')
@app.route("/echo/<message>")
async def echo(message):
while True:
returnq = asyncio.Queue(1)
await app.q.put((returnq, message))
response = await returnq.get()
return response
@app.route("/echo2/<message>")
async def echo2(message):
while True:
returnq = asyncio.Queue(1)
await app.q.put((returnq, message))
response = await returnq.get()
return response
Quart 是一个 Python 网络框架,它在 Python 的 asyncio
协程系统之上重新实现了 Flask API。在我的特殊情况下,我有一个 Quart websocket 端点,它应该不仅有一个传入事件源,还有 两个 应该继续异步循环的可能事件源。
具有一个事件源的示例:
from quart import Quart, websocket
app = Quart(__name__)
@app.websocket("/echo")
def echo():
while True:
incoming_message = await websocket.receive()
await websocket.send(incoming_message)
取自https://pgjones.gitlab.io/quart/
这个例子有一个来源:传入消息流。但是,如果我有两个可能的来源,那么正确的模式是什么,一个是 await websocket.receive()
而另一个是 await system.get_next_external_notification()
.
如果他们中的任何一个到达,我想发送一个 websocket 消息。
我想我必须使用 asyncio.wait(..., return_when=FIRST_COMPLETED)
,但我如何确保我没有遗漏任何数据(即对于 websocket.receive()
和 system.get_next_external_notification()
几乎完全相同的竞争条件时间) ?在这种情况下,正确的模式是什么?
您可以使用的一个想法是将来自不同来源的事件连接在一起的队列,然后使用异步函数在后台侦听该队列的请求。这样的事情可能会让你开始:
import asyncio
from quart import Quart, websocket
app = Quart(__name__)
@app.before_serving
async def startup():
print(f'starting')
app.q = asyncio.Queue(1)
asyncio.ensure_future(listener(app.q))
async def listener(q):
while True:
returnq, msg = await q.get()
print(msg)
await returnq.put(f'hi: {msg}')
@app.route("/echo/<message>")
async def echo(message):
while True:
returnq = asyncio.Queue(1)
await app.q.put((returnq, message))
response = await returnq.get()
return response
@app.route("/echo2/<message>")
async def echo2(message):
while True:
returnq = asyncio.Queue(1)
await app.q.put((returnq, message))
response = await returnq.get()
return response