Redis Pub/Sub 导致 web socket 连接挂起

Redis Pub/Sub causes web socket connection to hang

我正在构建一个通过网络套接字连接到服务器的网络应用程序。服务器组件是基于 sinatraredisfaye-websocket 的小型 Ruby 应用程序。服务器在 Phusion Passenger 上 运行。一个单独的更新程序守护进程不断地从各种来源提取更新并将它们发布到 redis(使用 redis gem 和 Redis::publish)。

为了将更新推送到客户端,我在我的 Sinatra 应用程序中尝试了以下操作:

get '/' do
  if Faye::WebSocket.websocket?(request.env)
    store = Redis.new
    ws = Faye::WebSocket.new(request.env)

    ws.on(:open) do |event|
      store.incr('connection_count')
      puts 'Client connected (connection count: %s)' % store.get('connection_count')
    end

    ws.on(:close) do |event|
      store.decr('connection_count')
      puts 'Client disconnected (connection count: %s)' % store.get('connection_count')
    end

    ws.rack_response

    store.subscribe(:updates) do |on|
      on.message do |ch, payload|
        puts "Got update"
        ws.send(payload) if payload
      end
    end
  end
end

这只能部分起作用。客户端可以成功连接并接收更新,但 store.incrstore.decr 调用不起作用。此外,连接似乎没有正确关闭——当我启动多个客户端时,我注意到连接堆积起来,Passenger 服务器最终停止工作。

日志输出:

devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update

当我注释掉以下块时,跟踪连接突然起作用了:

store.subscribe(:updates) do |on|
  on.message do |ch, payload|
    puts "Got update"
    ws.send(payload) if payload
  end
end

日志输出:

devserver_1 | App 1028 stdout: Client connected (connection count: 1)
devserver_1 | App 1039 stdout: Client connected (connection count: 2)
devserver_1 | App 1039 stdout: Client disconnected (connection count: 1)
devserver_1 | App 1028 stdout: Client disconnected (connection count: 0)

所以使用 Redis::subscribe 似乎会以某种方式干扰网络套接字连接。

我该如何解决这个问题?

我认为这里的问题是 Faye 使用 EventMachine,这意味着你的线程上有一个反应器正在处理事件,并调用你的回调 ws.on(:open)ws.on(:close).

现在当你点击

store.subscribe(:updates) do |on|
  on.message do |ch, payload|
    puts "Got update"
    ws.send(payload) if payload
  end
end

这是一个阻塞操作 - 它完全阻塞了当前线程。如果您当前的线程被阻塞,反应器将无法监听事件然后调用您的回调。

一个解决方案是 运行 你的 store.subscribe 在不同的线程上,所以它是否阻塞那个线程并不重要。

但我认为更好的解决方案是使用 non-blocking version of the Redis library:

来自文档:

redis = EM::Hiredis.connect
pubsub = redis.pubsub

pubsub.subscribe(:updates).callback do
    puts "Got update"
    ws.send(payload) if payload
end

这两个(Redis + Faye)都应该在 EventMachine 反应器循环中注册,以便它向两者发送事件。