WebSocket 和 Redis 导致来自 pubsub and/or brpop 的挂起连接

WebSocket and Redis results in hanging connection from pubsub and/or brpop

我正在 WebSocket (WS) 中发布 Redis 订阅。当我收到 WS 打开时,我将请求线程化,然后实例化 Redis 客户端。在公开范围内,我为 Redis 线程并发布订阅。

一切正常,直到我收到意外的 WS 关闭。那时,Redis 订阅的线程运行 消失了。如果我发出取消订阅,我就会挂起。如果我不退订,我就会留下一个幻影订阅,这会给我下一次带来麻烦。

有没有什么方法可以在发布订阅的线程终止后删除订阅?我注意到 Redis 实例有一个用于该终止线程的 mon 变量。示例 Ruby 代码为:

class Backend
  include MInit

  def initialize(app)
    setup
    @app = app
  end

  def run!(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
      ws_thread = Thread.fork(env) do
        credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)

        ws.on :open do |event|
          channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
          redis_thread = Thread.fork do
            credis.subscribe(channel) do |on|
              on.message do |message_channel, message|
                sent = ws.send(message)
              end
              on.unsubscribe do |message_channel|
                puts "Unsubscribe on channel:#{channel};"
              end
            end
          end
        end

        ws.on :message do |event|
          handoff(ws: ws, event: event)
        end

        ws.on :close do |event|
          # Hang occurs here
          unsubscribed = credis.unsubscribe(channel)
        end

        ws.on :error do |event|
          ws.close
        end

        # Return async Rack response
        ws.rack_response

      end
    end
  else
    @app.call(env)
  end

  private
  def handoff(ws: nil, event: nil, source: nil, message: nil)
    # processing
  end
end

一旦我真正理解了这个问题,修复就相当简单了。 Redis线程其实还是存在的。但是,Redis 无论如何都会挂起,因为它正在等待线程获得控制权。为此,WS.close 代码需要通过在 WS.close 中使用 EM.next_tick 来放弃控制权,如下所示:

ws.on :close do |event|
  EM.next_tick do
    # Hang occurs here
    unsubscribed = credis.unsubscribe(channel)
  end
end

这是一个提供解决方法而非解决方案的长评论。

如果是我的申请,我会重新考虑设计。

为每个 websocket 客户端打开一个新的 Redis 连接和一个新线程是一个相当大的资源承诺。

澄清一下,每个与 Redis 的连接都需要一个 TCP/IP 套接字(这是一种有限的资源)和内存。对于保留的堆栈内存,线程每个线程的成本应该约为 2Mib...因此 1K Redis 连接和线程会产生大约 2Gib 的内存成本。

此外,Redis 服务器本身通常可以接受的连接数量有限(尽管这通常是价格问题而不是硬限制,因为它们旨在扩展).

重新调整设计应该非常简单,因此单个线程和连接为所有 websocket 客户端提供服务,这也将允许更轻松的 subscribe/unsubscribe 管理。

这可以使用内部进程广播系统(例如 plezi.io 实现)或使用 Redis subscribe/punsubscribe 命令来执行。