取消 fastAPI websocket 中剩余的三重托儿任务的正确方法?
Proper way to cancel remaining trio nursery tasks inside fastAPI websocket?
我对 websockets 还是很陌生,遇到了一个我很难解决的问题。
我需要使用 FastAPI 构建一个 websocket 端点,其中一组任务是 运行 异步的(为此我使用了 trio),每个任务通过 websocket 返回一个 json 值实时。
我已经设法满足这些要求,我的代码如下所示:
@router.websocket('/stream')
async def runTasks(
websocket: WebSocket
):
# Initialise websocket
await websocket.accept()
while True:
# Receive data
tasks = await websocket.receive_json()
# Run tasks asynchronously (limiting to 10 tasks at a time)
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task, websocket)
run_task
看起来像这样:
async def run_task(limit, task, websocket):
async with limit:
# Complete task / transaction
await websocket.send_json({"placeholder":"data"})
但是现在,在两种情况下,我应该 cancel/skip 当前剩余的托儿任务,但我有点不知道如何实现它。
我给出的两种情况如下:
场景 1: 假设当用户按下按钮时调用端点,如果用户在某些任务仍在进行时再次按下按钮 运行宁他们应该被取消或跳过,这个过程应该重新开始
场景2:如果websocket被关闭,用户刷新页面,或者在托儿任务完成前退出,剩余任务应取消或跳过
我正在尝试阅读 的更多内容,但我仍然对如何在进入新的幼儿园之前使用取消范围取消之前的幼儿园感到困惑。我应该创建一个额外的任务来监视变量或其他东西并在它发生变化后取消吗?但是一旦所有其他任务完成,我就必须停止该任务
对于场景 1:
- 在全局命名空间中创建用于存储取消范围和事件的字典(键:
UUID
,值:Tuple[trio.CancelScope, trio.Event]
- 为每个客户端分配唯一的 UUID(任何对客户端唯一的)
- 让客户端在连接开始时发送 UUID
- 检查字典是否以该 UUID 作为键。如果存在,则取消范围并等待事件设置。
- 现在进行实际传输
对于场景 2:
如果客户端没有明确关闭 websocket,Websocket 不知道客户端是否断开连接。因此,我能想到的最好的办法是强制超时并等待客户端对每次传输的响应。 (这使得这种方法效率有点低)。
下面是上述思路的演示代码
客户代码:
由于我不知道客户端代码是什么样的,所以我只是做了一些客户端来测试你的问题。
这个有点bug,我没学过js - 请不要太认真地判断客户端代码!
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Websocket test</title>
</head>
<body>
<button id="start">Start connection</button>
<button id="close" disabled>Close connection</button>
<input type="text" id="input_" value="INPUT_YOUR_UUID">
<div id="state">Status: Waiting for connection</div>
<script>
let state = document.getElementById("state")
let start_btn = document.getElementById("start")
let close_btn = document.getElementById("close")
let input_ = document.getElementById("input_")
function sleep(sec) {
state.textContent = `Status: sleeping ${sec} seconds`
return new Promise((func) => setTimeout(func, sec * 1000))
}
function websocket_test() {
return new Promise((resolve, reject) => {
let socket = new WebSocket("ws://127.0.0.1:8000/stream")
socket.onopen = function () {
state.textContent = "Status: Sending UUID - " + input_.value
socket.send(input_.value)
close_btn.disabled = false
close_btn.onclick = function () {socket.close()}
}
socket.onmessage = function (msg) {
state.textContent = "Status: Message Received - " + msg.data
socket.send("Received")
}
socket.onerror = function (error) {
reject(error)
state.textContent = "Status: Error encountered"
}
socket.onclose = function () {
state.textContent = "Status: Connection Stopped"
close_btn.disabled = true
}
})
}
start_btn.onclick = websocket_test
</script>
</body>
</html>
服务器代码:
在之前的测试中,我看到服务器抛出超时,但无法重现 - 如果对行为有信心,您可能不需要 trio.fail_after
和 except trio.TooSlowError
部分。
"""
Nursery cancellation demo
"""
import itertools
import trio
import fastapi
import hypercorn
from hypercorn.trio import serve
NURSERY = trio.open_nursery()
GLOBAL_NURSERY_STORAGE = {}
TIMEOUT = 5
router = fastapi.APIRouter()
@router.websocket('/stream')
async def run_task(websocket: fastapi.WebSocket):
# accept and receive UUID
# Replace UUID with anything client-specific
await websocket.accept()
uuid_ = await websocket.receive_text()
print(f"[{uuid_}] CONNECTED")
# check if nursery exist in session, if exists, cancel it and wait it to end.
cancel_scope: trio.CancelScope
event: trio.Event
try:
cancel_scope, event = GLOBAL_NURSERY_STORAGE[uuid_]
except KeyError:
pass
else:
print(f"[{uuid_}] STOPPING NURSERY")
cancel_scope.cancel()
await event.wait()
# create new event, and start new nursery.
cancel_done_event = trio.Event()
async with trio.open_nursery() as nursery:
# save ref
GLOBAL_NURSERY_STORAGE[uuid_] = nursery.cancel_scope, cancel_done_event
try:
for n in itertools.count(0, 1):
nursery.start_soon(task, n, uuid_, websocket)
await trio.sleep(1)
# wait for client response
with trio.fail_after(TIMEOUT):
recv = await websocket.receive_text()
print(f"[{uuid_}] RECEIVED {recv}")
except trio.TooSlowError:
# client possibly left without proper disconnection.
print(f"[{uuid_}] CLIENT TIMEOUT")
except fastapi.websockets.WebSocketDisconnect:
print(f"[{uuid_}] CLIENT DISCONNECTED")
# fire event, and pop reference if any.
print(f"[{uuid_}] NURSERY STOPPED & REFERENCE DROPPED")
cancel_done_event.set()
GLOBAL_NURSERY_STORAGE.pop(uuid_, None)
async def task(text, uuid_, websocket: fastapi.WebSocket):
await websocket.send_text(str(text))
print(f"[{uuid_}] SENT {text}")
if __name__ == '__main__':
cornfig = hypercorn.Config()
# cornfig.bind = "ws://127.0.0.1:8000"
trio.run(serve, router, cornfig)
示例 运行 输出:
客户
服务器
[2022-01-31 21:23:12 +0900] [17204] [INFO] Running on http://127.0.0.1:8000 (CTRL + C to quit)
[2] CONNECTED < start connection on tab 2
[2] SENT 0
[2] RECEIVED Received
[2] SENT 1
[2] RECEIVED Received
[2] SENT 2
[2] RECEIVED Received
[2] SENT 3
[2] RECEIVED Received
[2] SENT 4
[1] CONNECTED < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 5
[1] RECEIVED Received
[1] SENT 1
...
[2] SENT 18
[1] RECEIVED Received
[1] SENT 14
[2] RECEIVED Received
[2] SENT 19
[1] CLIENT DISCONNECTED < closed connection on tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED < tab 1 nursery terminated
[2] RECEIVED Received
[2] SENT 20
[2] RECEIVED Received
[2] SENT 21
[1] CONNECTED < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 22
[1] RECEIVED Received
...
[2] SENT 26
[1] RECEIVED Received
[1] SENT 5
[2] CLIENT DISCONNECTED < tab 2 closed
[2] NURSERY STOPPED & REFERENCE DROPPED < tab 2 nursery terminated
[1] RECEIVED Received
[1] SENT 6
[1] RECEIVED Received
[1] SENT 7
[1] RECEIVED Received
[1] SENT 8
[1] CONNECTED < start another connection on tab 1 without closing
[1] STOPPING NURSERY < previous connection on tab 1 terminating
[1] NURSERY STOPPED & REFERENCE DROPPED < previous connection on tab 1 terminated
[1] SENT 0
[1] RECEIVED Received
[1] SENT 1
...
[1] RECEIVED Received
[1] SENT 8
[1] CLIENT DISCONNECTED < Refreshed tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED < tab 1 nursery terminated
...
我对 websockets 还是很陌生,遇到了一个我很难解决的问题。
我需要使用 FastAPI 构建一个 websocket 端点,其中一组任务是 运行 异步的(为此我使用了 trio),每个任务通过 websocket 返回一个 json 值实时。
我已经设法满足这些要求,我的代码如下所示:
@router.websocket('/stream')
async def runTasks(
websocket: WebSocket
):
# Initialise websocket
await websocket.accept()
while True:
# Receive data
tasks = await websocket.receive_json()
# Run tasks asynchronously (limiting to 10 tasks at a time)
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task, websocket)
run_task
看起来像这样:
async def run_task(limit, task, websocket):
async with limit:
# Complete task / transaction
await websocket.send_json({"placeholder":"data"})
但是现在,在两种情况下,我应该 cancel/skip 当前剩余的托儿任务,但我有点不知道如何实现它。
我给出的两种情况如下:
场景 1: 假设当用户按下按钮时调用端点,如果用户在某些任务仍在进行时再次按下按钮 运行宁他们应该被取消或跳过,这个过程应该重新开始
场景2:如果websocket被关闭,用户刷新页面,或者在托儿任务完成前退出,剩余任务应取消或跳过
我正在尝试阅读
对于场景 1:
- 在全局命名空间中创建用于存储取消范围和事件的字典(键:
UUID
,值:Tuple[trio.CancelScope, trio.Event]
- 为每个客户端分配唯一的 UUID(任何对客户端唯一的)
- 让客户端在连接开始时发送 UUID
- 检查字典是否以该 UUID 作为键。如果存在,则取消范围并等待事件设置。
- 现在进行实际传输
对于场景 2:
如果客户端没有明确关闭 websocket,Websocket 不知道客户端是否断开连接。因此,我能想到的最好的办法是强制超时并等待客户端对每次传输的响应。 (这使得这种方法效率有点低)。
下面是上述思路的演示代码
客户代码:
由于我不知道客户端代码是什么样的,所以我只是做了一些客户端来测试你的问题。
这个有点bug,我没学过js - 请不要太认真地判断客户端代码!
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Websocket test</title>
</head>
<body>
<button id="start">Start connection</button>
<button id="close" disabled>Close connection</button>
<input type="text" id="input_" value="INPUT_YOUR_UUID">
<div id="state">Status: Waiting for connection</div>
<script>
let state = document.getElementById("state")
let start_btn = document.getElementById("start")
let close_btn = document.getElementById("close")
let input_ = document.getElementById("input_")
function sleep(sec) {
state.textContent = `Status: sleeping ${sec} seconds`
return new Promise((func) => setTimeout(func, sec * 1000))
}
function websocket_test() {
return new Promise((resolve, reject) => {
let socket = new WebSocket("ws://127.0.0.1:8000/stream")
socket.onopen = function () {
state.textContent = "Status: Sending UUID - " + input_.value
socket.send(input_.value)
close_btn.disabled = false
close_btn.onclick = function () {socket.close()}
}
socket.onmessage = function (msg) {
state.textContent = "Status: Message Received - " + msg.data
socket.send("Received")
}
socket.onerror = function (error) {
reject(error)
state.textContent = "Status: Error encountered"
}
socket.onclose = function () {
state.textContent = "Status: Connection Stopped"
close_btn.disabled = true
}
})
}
start_btn.onclick = websocket_test
</script>
</body>
</html>
服务器代码:
在之前的测试中,我看到服务器抛出超时,但无法重现 - 如果对行为有信心,您可能不需要 trio.fail_after
和 except trio.TooSlowError
部分。
"""
Nursery cancellation demo
"""
import itertools
import trio
import fastapi
import hypercorn
from hypercorn.trio import serve
NURSERY = trio.open_nursery()
GLOBAL_NURSERY_STORAGE = {}
TIMEOUT = 5
router = fastapi.APIRouter()
@router.websocket('/stream')
async def run_task(websocket: fastapi.WebSocket):
# accept and receive UUID
# Replace UUID with anything client-specific
await websocket.accept()
uuid_ = await websocket.receive_text()
print(f"[{uuid_}] CONNECTED")
# check if nursery exist in session, if exists, cancel it and wait it to end.
cancel_scope: trio.CancelScope
event: trio.Event
try:
cancel_scope, event = GLOBAL_NURSERY_STORAGE[uuid_]
except KeyError:
pass
else:
print(f"[{uuid_}] STOPPING NURSERY")
cancel_scope.cancel()
await event.wait()
# create new event, and start new nursery.
cancel_done_event = trio.Event()
async with trio.open_nursery() as nursery:
# save ref
GLOBAL_NURSERY_STORAGE[uuid_] = nursery.cancel_scope, cancel_done_event
try:
for n in itertools.count(0, 1):
nursery.start_soon(task, n, uuid_, websocket)
await trio.sleep(1)
# wait for client response
with trio.fail_after(TIMEOUT):
recv = await websocket.receive_text()
print(f"[{uuid_}] RECEIVED {recv}")
except trio.TooSlowError:
# client possibly left without proper disconnection.
print(f"[{uuid_}] CLIENT TIMEOUT")
except fastapi.websockets.WebSocketDisconnect:
print(f"[{uuid_}] CLIENT DISCONNECTED")
# fire event, and pop reference if any.
print(f"[{uuid_}] NURSERY STOPPED & REFERENCE DROPPED")
cancel_done_event.set()
GLOBAL_NURSERY_STORAGE.pop(uuid_, None)
async def task(text, uuid_, websocket: fastapi.WebSocket):
await websocket.send_text(str(text))
print(f"[{uuid_}] SENT {text}")
if __name__ == '__main__':
cornfig = hypercorn.Config()
# cornfig.bind = "ws://127.0.0.1:8000"
trio.run(serve, router, cornfig)
示例 运行 输出:
客户
服务器
[2022-01-31 21:23:12 +0900] [17204] [INFO] Running on http://127.0.0.1:8000 (CTRL + C to quit)
[2] CONNECTED < start connection on tab 2
[2] SENT 0
[2] RECEIVED Received
[2] SENT 1
[2] RECEIVED Received
[2] SENT 2
[2] RECEIVED Received
[2] SENT 3
[2] RECEIVED Received
[2] SENT 4
[1] CONNECTED < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 5
[1] RECEIVED Received
[1] SENT 1
...
[2] SENT 18
[1] RECEIVED Received
[1] SENT 14
[2] RECEIVED Received
[2] SENT 19
[1] CLIENT DISCONNECTED < closed connection on tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED < tab 1 nursery terminated
[2] RECEIVED Received
[2] SENT 20
[2] RECEIVED Received
[2] SENT 21
[1] CONNECTED < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 22
[1] RECEIVED Received
...
[2] SENT 26
[1] RECEIVED Received
[1] SENT 5
[2] CLIENT DISCONNECTED < tab 2 closed
[2] NURSERY STOPPED & REFERENCE DROPPED < tab 2 nursery terminated
[1] RECEIVED Received
[1] SENT 6
[1] RECEIVED Received
[1] SENT 7
[1] RECEIVED Received
[1] SENT 8
[1] CONNECTED < start another connection on tab 1 without closing
[1] STOPPING NURSERY < previous connection on tab 1 terminating
[1] NURSERY STOPPED & REFERENCE DROPPED < previous connection on tab 1 terminated
[1] SENT 0
[1] RECEIVED Received
[1] SENT 1
...
[1] RECEIVED Received
[1] SENT 8
[1] CLIENT DISCONNECTED < Refreshed tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED < tab 1 nursery terminated
...