Redis Pub/Sub 导致 web socket 连接挂起
Redis Pub/Sub causes web socket connection to hang
我正在构建一个通过网络套接字连接到服务器的网络应用程序。服务器组件是基于 sinatra
、redis
和 faye-websocket
的小型 Ruby 应用程序。服务器在 Phusion Passenger 上 运行。一个单独的更新程序守护进程不断地从各种来源提取更新并将它们发布到 redis(使用 redis
gem 和 Redis::publish
)。
为了将更新推送到客户端,我在我的 Sinatra 应用程序中尝试了以下操作:
get '/' do
if Faye::WebSocket.websocket?(request.env)
store = Redis.new
ws = Faye::WebSocket.new(request.env)
ws.on(:open) do |event|
store.incr('connection_count')
puts 'Client connected (connection count: %s)' % store.get('connection_count')
end
ws.on(:close) do |event|
store.decr('connection_count')
puts 'Client disconnected (connection count: %s)' % store.get('connection_count')
end
ws.rack_response
store.subscribe(:updates) do |on|
on.message do |ch, payload|
puts "Got update"
ws.send(payload) if payload
end
end
end
end
这只能部分起作用。客户端可以成功连接并接收更新,但 store.incr
和 store.decr
调用不起作用。此外,连接似乎没有正确关闭——当我启动多个客户端时,我注意到连接堆积起来,Passenger 服务器最终停止工作。
日志输出:
devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
当我注释掉以下块时,跟踪连接突然起作用了:
store.subscribe(:updates) do |on|
on.message do |ch, payload|
puts "Got update"
ws.send(payload) if payload
end
end
日志输出:
devserver_1 | App 1028 stdout: Client connected (connection count: 1)
devserver_1 | App 1039 stdout: Client connected (connection count: 2)
devserver_1 | App 1039 stdout: Client disconnected (connection count: 1)
devserver_1 | App 1028 stdout: Client disconnected (connection count: 0)
所以使用 Redis::subscribe
似乎会以某种方式干扰网络套接字连接。
我该如何解决这个问题?
- Phusion Passenger 版本 4.0.58
- ruby 2.2.0p0(2014-12-25 修订版 49005)[x86_64-linux-gnu]
- 西纳特拉 (1.4.6)
- faye-websocket (0.9.2)
我认为这里的问题是 Faye 使用 EventMachine,这意味着你的线程上有一个反应器正在处理事件,并调用你的回调 ws.on(:open)
和 ws.on(:close)
.
现在当你点击
store.subscribe(:updates) do |on|
on.message do |ch, payload|
puts "Got update"
ws.send(payload) if payload
end
end
这是一个阻塞操作 - 它完全阻塞了当前线程。如果您当前的线程被阻塞,反应器将无法监听事件然后调用您的回调。
一个解决方案是 运行 你的 store.subscribe
在不同的线程上,所以它是否阻塞那个线程并不重要。
但我认为更好的解决方案是使用 non-blocking version of the Redis library:
来自文档:
redis = EM::Hiredis.connect
pubsub = redis.pubsub
pubsub.subscribe(:updates).callback do
puts "Got update"
ws.send(payload) if payload
end
这两个(Redis + Faye)都应该在 EventMachine 反应器循环中注册,以便它向两者发送事件。
我正在构建一个通过网络套接字连接到服务器的网络应用程序。服务器组件是基于 sinatra
、redis
和 faye-websocket
的小型 Ruby 应用程序。服务器在 Phusion Passenger 上 运行。一个单独的更新程序守护进程不断地从各种来源提取更新并将它们发布到 redis(使用 redis
gem 和 Redis::publish
)。
为了将更新推送到客户端,我在我的 Sinatra 应用程序中尝试了以下操作:
get '/' do
if Faye::WebSocket.websocket?(request.env)
store = Redis.new
ws = Faye::WebSocket.new(request.env)
ws.on(:open) do |event|
store.incr('connection_count')
puts 'Client connected (connection count: %s)' % store.get('connection_count')
end
ws.on(:close) do |event|
store.decr('connection_count')
puts 'Client disconnected (connection count: %s)' % store.get('connection_count')
end
ws.rack_response
store.subscribe(:updates) do |on|
on.message do |ch, payload|
puts "Got update"
ws.send(payload) if payload
end
end
end
end
这只能部分起作用。客户端可以成功连接并接收更新,但 store.incr
和 store.decr
调用不起作用。此外,连接似乎没有正确关闭——当我启动多个客户端时,我注意到连接堆积起来,Passenger 服务器最终停止工作。
日志输出:
devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
当我注释掉以下块时,跟踪连接突然起作用了:
store.subscribe(:updates) do |on|
on.message do |ch, payload|
puts "Got update"
ws.send(payload) if payload
end
end
日志输出:
devserver_1 | App 1028 stdout: Client connected (connection count: 1)
devserver_1 | App 1039 stdout: Client connected (connection count: 2)
devserver_1 | App 1039 stdout: Client disconnected (connection count: 1)
devserver_1 | App 1028 stdout: Client disconnected (connection count: 0)
所以使用 Redis::subscribe
似乎会以某种方式干扰网络套接字连接。
我该如何解决这个问题?
- Phusion Passenger 版本 4.0.58
- ruby 2.2.0p0(2014-12-25 修订版 49005)[x86_64-linux-gnu]
- 西纳特拉 (1.4.6)
- faye-websocket (0.9.2)
我认为这里的问题是 Faye 使用 EventMachine,这意味着你的线程上有一个反应器正在处理事件,并调用你的回调 ws.on(:open)
和 ws.on(:close)
.
现在当你点击
store.subscribe(:updates) do |on|
on.message do |ch, payload|
puts "Got update"
ws.send(payload) if payload
end
end
这是一个阻塞操作 - 它完全阻塞了当前线程。如果您当前的线程被阻塞,反应器将无法监听事件然后调用您的回调。
一个解决方案是 运行 你的 store.subscribe
在不同的线程上,所以它是否阻塞那个线程并不重要。
但我认为更好的解决方案是使用 non-blocking version of the Redis library:
来自文档:
redis = EM::Hiredis.connect
pubsub = redis.pubsub
pubsub.subscribe(:updates).callback do
puts "Got update"
ws.send(payload) if payload
end
这两个(Redis + Faye)都应该在 EventMachine 反应器循环中注册,以便它向两者发送事件。