使用 Redis pubsub 功能时 FastAPI websockets 不工作

FastAPI websockets not working when using Redis pubsub functionality

目前我正在使用 websockets 传递我从 Redis 队列 (pub/sub) 接收的数据。但是由于某些原因,websocket 在使用这个 redis 队列时不发送消息。

我的代码是什么样的

我的代码如下:

  1. 我接受套接字连接
  2. 我连接到redis队列
  3. 对于我从订阅中收到的每条消息,我都通过套接字发送了一条消息。 (目前只有文字测试)
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        for result in subscribe.listen():
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

代码问题

当我使用这段代码时,它只是不通过套接字发送消息。但是,当我在 subscribe.listen() 循环中接受 websocket 时,它确实有效,但每次都会重新连接(参见下面的代码)。

@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        for result in subscribe.listen():
            await websocket.accept()
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

我认为 subscribe.listen() 会导致一些问题,当 websocket.accept() 在 for 循环之外时,websocket 什么都不做。

我希望有人知道这有什么问题。

我不确定这是否有效,但你可以试试这个:

async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        results = await subscribe.listen()
        for result in results:
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

经过几天的研究,我找到了解决这个问题的办法。我使用 aioredis. This solution is based on the following GitHub Gist.

解决了它
import json
import aioredis

from fastapi import APIRouter, WebSocket

from app.service.config_service import load_config

check_route = APIRouter()


@check_route.websocket("/check")
async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    # ---------------------------- REDIS REQUIREMENTS ---------------------------- #
    config = load_config()
    redis_uri: str = f"redis://{config.redis.host}:{config.redis.port}"
    redis_channel = config.redis.redis_socket_queue.channel
    redis = await aioredis.create_redis_pool(redis_uri)

    # ------------------ SEND SUBSCRIBE RESULT THROUGH WEBSOCKET ----------------- #
    (channel,) = await redis.subscribe(redis_channel)
    assert isinstance(channel, aioredis.Channel)
    try:
        while True:
            response_raw = await channel.get()
            response_str = response_raw.decode("utf-8")
            response = json.loads(response_str)

            if response:
                await websocket.send_json({
                    "event": 'NEW_CHECK_RESULT',
                    "data": response
                })
    except Exception as e:
        raise e