FastAPI 中用于音频流的 Websockets 桥
Websockets bridge for audio stream in FastAPI
Objective
我的objective 是消费音频流。从逻辑上讲,这是我的 objective:
- 音频流来自 WebSocket A(
FastAPI
端点)
- 音频流桥接到不同的 WebSocket B,它将 return 一个 JSON(Rev-ai 的 WebSocket)
- Json 结果通过 WebSocket A 实时发回。因此,当音频流仍在传入时。
可能的解决方案
为了解决这个问题,我有很多想法,但最终我一直在尝试将 WebSocket A
连接到 WebSocket B
。到目前为止,我的尝试涉及一个 ConnectionManager
class,其中包含一个 Queue.queue
。音频流的块被添加到这个队列中,这样我们就不会直接从 WebSocket A
.
消费
ConnectionManager
还包含一个生成器方法,用于生成队列中的所有值。
我的 FastAPI 实现像这样消耗 websocket A
:
@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
chunk = await websocket.receive_bytes()
manager.add_to_buffer(chunk)
except KeyboardInterrupt:
manager.disconnect()
在此摄取的同时,我想要一个任务,将我们的音频流桥接到 WebSocket B
,并将获得的值发送到 WebSocket A
。可以通过上述 generator
方法使用音频流。
根据 Rev-ai 的 examples:
,WebSocket B 消费消息的方式需要生成器方法
streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
# return through websocket A this value
print(response)
这是最大的挑战之一,因为我们需要将数据消耗到生成器中并实时获得结果。
最近的尝试
我一直在尝试 asyncio
的运气;根据我的理解,一种可能性是创建一个在后台 运行 的协程。我一直没有成功,但听起来很有希望。
我考虑过通过 FastAPI
启动事件触发它,但我无法实现并发。我尝试使用 event_loops
,但它给了我一个 nested event loop
相关的错误。
警告
FastAPI 可以是可选的,如果您的洞察力认为如此,WebSocket A 在某种程度上也是如此。归根结底,最终的 objective 是通过以下方式接收音频流我们自己的 API 端点,运行 它通过 Rev.ai 的 WebSocket,做一些额外的处理,然后将结果发回。
websocket 的桥梁 <-> websocket
下面是一个简单的webscoket代理示例,其中websocketA
和websocketB
都是FastAPI应用程序中的端点,但是websocketB
可以位于其他地方,只需更改其地址 ws_b_uri
。对于 websocket 客户端,使用 websockets 库。
为了进行数据转发,A
端点的代码启动了两个任务forward
和reverse
,并通过asyncio.gather()
等待它们完成。双向数据传输以并行方式进行。
import asyncio
from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()
ws_b_uri = "ws://localhost:8001/ws_b"
async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await ws_b.send(data)
async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_b.recv()
await ws_a.send_text(data)
print("websocket A sent:", data)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
await ws_a.accept()
async with websockets.connect(ws_b_uri) as ws_b_client:
fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
await asyncio.gather(fwd_task, rev_task)
@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
await ws_b_server.accept()
while True:
data = await ws_b_server.receive_bytes()
print("websocket B server recieved: ", data)
await ws_b_server.send_text('{"response": "value from B server"}')
更新(Bridge websocket <-> 同步生成器)
考虑到问题的最后更新,问题是 WebSocket B
隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出)和事实上,任务变成了如何在 WebSocket 和同步生成器之间架起一座桥梁。因为我从来没有使用过 rev-ai
库,所以我为 streamclient.start
创建了一个存根函数 stream_client_start
,它需要一个生成器(原始的 MEDIA_GENERATOR
)和 returns生成器(原 response_generator
)。
在这种情况下,我通过 run_in_executor
在单独的线程中开始处理生成器,为了不重新发明轮子,为了通信我使用 janus
库中的队列,它允许您通过队列绑定同步和异步代码。相应地,也有两个队列,一个用于A -> B
,一个用于B -> A
。
import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue
app = FastAPI()
# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
for chunk in input_gen:
time.sleep(1)
yield f"Get {chunk}"
# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
while True:
yield sync_queue.get()
async def forward(ws_a: WebSocket, queue_b):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await queue_b.put(data)
async def reverse(ws_a: WebSocket, queue_b):
while True:
data = await queue_b.get()
await ws_a.send_text(data)
print("websocket A sent:", data)
def process_b_client(fwd_queue, rev_queue):
response_generator = stream_client_start(queue_to_generator(fwd_queue))
for r in response_generator:
rev_queue.put(r)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
loop = asyncio.get_event_loop()
fwd_queue = janus.Queue()
rev_queue = janus.Queue()
await ws_a.accept()
process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
await asyncio.gather(process_client_task, fwd_task, rev_task)
Objective
我的objective 是消费音频流。从逻辑上讲,这是我的 objective:
- 音频流来自 WebSocket A(
FastAPI
端点) - 音频流桥接到不同的 WebSocket B,它将 return 一个 JSON(Rev-ai 的 WebSocket)
- Json 结果通过 WebSocket A 实时发回。因此,当音频流仍在传入时。
可能的解决方案
为了解决这个问题,我有很多想法,但最终我一直在尝试将 WebSocket A
连接到 WebSocket B
。到目前为止,我的尝试涉及一个 ConnectionManager
class,其中包含一个 Queue.queue
。音频流的块被添加到这个队列中,这样我们就不会直接从 WebSocket A
.
ConnectionManager
还包含一个生成器方法,用于生成队列中的所有值。
我的 FastAPI 实现像这样消耗 websocket A
:
@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
chunk = await websocket.receive_bytes()
manager.add_to_buffer(chunk)
except KeyboardInterrupt:
manager.disconnect()
在此摄取的同时,我想要一个任务,将我们的音频流桥接到 WebSocket B
,并将获得的值发送到 WebSocket A
。可以通过上述 generator
方法使用音频流。
根据 Rev-ai 的 examples:
,WebSocket B 消费消息的方式需要生成器方法streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
# return through websocket A this value
print(response)
这是最大的挑战之一,因为我们需要将数据消耗到生成器中并实时获得结果。
最近的尝试
我一直在尝试 asyncio
的运气;根据我的理解,一种可能性是创建一个在后台 运行 的协程。我一直没有成功,但听起来很有希望。
我考虑过通过 FastAPI
启动事件触发它,但我无法实现并发。我尝试使用 event_loops
,但它给了我一个 nested event loop
相关的错误。
警告
FastAPI 可以是可选的,如果您的洞察力认为如此,WebSocket A 在某种程度上也是如此。归根结底,最终的 objective 是通过以下方式接收音频流我们自己的 API 端点,运行 它通过 Rev.ai 的 WebSocket,做一些额外的处理,然后将结果发回。
websocket 的桥梁 <-> websocket
下面是一个简单的webscoket代理示例,其中websocketA
和websocketB
都是FastAPI应用程序中的端点,但是websocketB
可以位于其他地方,只需更改其地址 ws_b_uri
。对于 websocket 客户端,使用 websockets 库。
为了进行数据转发,A
端点的代码启动了两个任务forward
和reverse
,并通过asyncio.gather()
等待它们完成。双向数据传输以并行方式进行。
import asyncio
from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()
ws_b_uri = "ws://localhost:8001/ws_b"
async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await ws_b.send(data)
async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_b.recv()
await ws_a.send_text(data)
print("websocket A sent:", data)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
await ws_a.accept()
async with websockets.connect(ws_b_uri) as ws_b_client:
fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
await asyncio.gather(fwd_task, rev_task)
@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
await ws_b_server.accept()
while True:
data = await ws_b_server.receive_bytes()
print("websocket B server recieved: ", data)
await ws_b_server.send_text('{"response": "value from B server"}')
更新(Bridge websocket <-> 同步生成器)
考虑到问题的最后更新,问题是 WebSocket B
隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出)和事实上,任务变成了如何在 WebSocket 和同步生成器之间架起一座桥梁。因为我从来没有使用过 rev-ai
库,所以我为 streamclient.start
创建了一个存根函数 stream_client_start
,它需要一个生成器(原始的 MEDIA_GENERATOR
)和 returns生成器(原 response_generator
)。
在这种情况下,我通过 run_in_executor
在单独的线程中开始处理生成器,为了不重新发明轮子,为了通信我使用 janus
库中的队列,它允许您通过队列绑定同步和异步代码。相应地,也有两个队列,一个用于A -> B
,一个用于B -> A
。
import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue
app = FastAPI()
# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
for chunk in input_gen:
time.sleep(1)
yield f"Get {chunk}"
# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
while True:
yield sync_queue.get()
async def forward(ws_a: WebSocket, queue_b):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await queue_b.put(data)
async def reverse(ws_a: WebSocket, queue_b):
while True:
data = await queue_b.get()
await ws_a.send_text(data)
print("websocket A sent:", data)
def process_b_client(fwd_queue, rev_queue):
response_generator = stream_client_start(queue_to_generator(fwd_queue))
for r in response_generator:
rev_queue.put(r)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
loop = asyncio.get_event_loop()
fwd_queue = janus.Queue()
rev_queue = janus.Queue()
await ws_a.accept()
process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
await asyncio.gather(process_client_task, fwd_task, rev_task)