使用 Redis pubsub 功能时 FastAPI websockets 不工作
FastAPI websockets not working when using Redis pubsub functionality
目前我正在使用 websockets 传递我从 Redis 队列 (pub/sub) 接收的数据。但是由于某些原因,websocket 在使用这个 redis 队列时不发送消息。
我的代码是什么样的
我的代码如下:
- 我接受套接字连接
- 我连接到redis队列
- 对于我从订阅中收到的每条消息,我都通过套接字发送了一条消息。 (目前只有文字测试)
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
for result in subscribe.listen():
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
代码问题
当我使用这段代码时,它只是不通过套接字发送消息。但是,当我在 subscribe.listen()
循环中接受 websocket 时,它确实有效,但每次都会重新连接(参见下面的代码)。
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
for result in subscribe.listen():
await websocket.accept()
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
我认为 subscribe.listen()
会导致一些问题,当 websocket.accept()
在 for 循环之外时,websocket 什么都不做。
我希望有人知道这有什么问题。
我不确定这是否有效,但你可以试试这个:
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
results = await subscribe.listen()
for result in results:
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
经过几天的研究,我找到了解决这个问题的办法。我使用 aioredis. This solution is based on the following GitHub Gist.
解决了它
import json
import aioredis
from fastapi import APIRouter, WebSocket
from app.service.config_service import load_config
check_route = APIRouter()
@check_route.websocket("/check")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# ---------------------------- REDIS REQUIREMENTS ---------------------------- #
config = load_config()
redis_uri: str = f"redis://{config.redis.host}:{config.redis.port}"
redis_channel = config.redis.redis_socket_queue.channel
redis = await aioredis.create_redis_pool(redis_uri)
# ------------------ SEND SUBSCRIBE RESULT THROUGH WEBSOCKET ----------------- #
(channel,) = await redis.subscribe(redis_channel)
assert isinstance(channel, aioredis.Channel)
try:
while True:
response_raw = await channel.get()
response_str = response_raw.decode("utf-8")
response = json.loads(response_str)
if response:
await websocket.send_json({
"event": 'NEW_CHECK_RESULT',
"data": response
})
except Exception as e:
raise e
目前我正在使用 websockets 传递我从 Redis 队列 (pub/sub) 接收的数据。但是由于某些原因,websocket 在使用这个 redis 队列时不发送消息。
我的代码是什么样的
我的代码如下:
- 我接受套接字连接
- 我连接到redis队列
- 对于我从订阅中收到的每条消息,我都通过套接字发送了一条消息。 (目前只有文字测试)
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
for result in subscribe.listen():
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
代码问题
当我使用这段代码时,它只是不通过套接字发送消息。但是,当我在 subscribe.listen()
循环中接受 websocket 时,它确实有效,但每次都会重新连接(参见下面的代码)。
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
for result in subscribe.listen():
await websocket.accept()
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
我认为 subscribe.listen()
会导致一些问题,当 websocket.accept()
在 for 循环之外时,websocket 什么都不做。
我希望有人知道这有什么问题。
我不确定这是否有效,但你可以试试这个:
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
results = await subscribe.listen()
for result in results:
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
经过几天的研究,我找到了解决这个问题的办法。我使用 aioredis. This solution is based on the following GitHub Gist.
解决了它import json
import aioredis
from fastapi import APIRouter, WebSocket
from app.service.config_service import load_config
check_route = APIRouter()
@check_route.websocket("/check")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# ---------------------------- REDIS REQUIREMENTS ---------------------------- #
config = load_config()
redis_uri: str = f"redis://{config.redis.host}:{config.redis.port}"
redis_channel = config.redis.redis_socket_queue.channel
redis = await aioredis.create_redis_pool(redis_uri)
# ------------------ SEND SUBSCRIBE RESULT THROUGH WEBSOCKET ----------------- #
(channel,) = await redis.subscribe(redis_channel)
assert isinstance(channel, aioredis.Channel)
try:
while True:
response_raw = await channel.get()
response_str = response_raw.decode("utf-8")
response = json.loads(response_str)
if response:
await websocket.send_json({
"event": 'NEW_CHECK_RESULT',
"data": response
})
except Exception as e:
raise e