WebSocket 和 Redis 导致来自 pubsub and/or brpop 的挂起连接
WebSocket and Redis results in hanging connection from pubsub and/or brpop
我正在 WebSocket (WS) 中发布 Redis 订阅。当我收到 WS 打开时,我将请求线程化,然后实例化 Redis 客户端。在公开范围内,我为 Redis 线程并发布订阅。
一切正常,直到我收到意外的 WS 关闭。那时,Redis 订阅的线程运行 消失了。如果我发出取消订阅,我就会挂起。如果我不退订,我就会留下一个幻影订阅,这会给我下一次带来麻烦。
有没有什么方法可以在发布订阅的线程终止后删除订阅?我注意到 Redis 实例有一个用于该终止线程的 mon 变量。示例 Ruby 代码为:
class Backend
include MInit
def initialize(app)
setup
@app = app
end
def run!(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
ws_thread = Thread.fork(env) do
credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)
ws.on :open do |event|
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
redis_thread = Thread.fork do
credis.subscribe(channel) do |on|
on.message do |message_channel, message|
sent = ws.send(message)
end
on.unsubscribe do |message_channel|
puts "Unsubscribe on channel:#{channel};"
end
end
end
end
ws.on :message do |event|
handoff(ws: ws, event: event)
end
ws.on :close do |event|
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
ws.on :error do |event|
ws.close
end
# Return async Rack response
ws.rack_response
end
end
else
@app.call(env)
end
private
def handoff(ws: nil, event: nil, source: nil, message: nil)
# processing
end
end
一旦我真正理解了这个问题,修复就相当简单了。 Redis线程其实还是存在的。但是,Redis 无论如何都会挂起,因为它正在等待线程获得控制权。为此,WS.close 代码需要通过在 WS.close 中使用 EM.next_tick 来放弃控制权,如下所示:
ws.on :close do |event|
EM.next_tick do
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
end
这是一个提供解决方法而非解决方案的长评论。
如果是我的申请,我会重新考虑设计。
为每个 websocket 客户端打开一个新的 Redis 连接和一个新线程是一个相当大的资源承诺。
澄清一下,每个与 Redis 的连接都需要一个 TCP/IP 套接字(这是一种有限的资源)和内存。对于保留的堆栈内存,线程每个线程的成本应该约为 2Mib...因此 1K Redis 连接和线程会产生大约 2Gib 的内存成本。
此外,Redis 服务器本身通常可以接受的连接数量有限(尽管这通常是价格问题而不是硬限制,因为它们旨在扩展).
重新调整设计应该非常简单,因此单个线程和连接为所有 websocket 客户端提供服务,这也将允许更轻松的 subscribe
/unsubscribe
管理。
这可以使用内部进程广播系统(例如 plezi.io 实现)或使用 Redis subscribe
/punsubscribe
命令来执行。
我正在 WebSocket (WS) 中发布 Redis 订阅。当我收到 WS 打开时,我将请求线程化,然后实例化 Redis 客户端。在公开范围内,我为 Redis 线程并发布订阅。
一切正常,直到我收到意外的 WS 关闭。那时,Redis 订阅的线程运行 消失了。如果我发出取消订阅,我就会挂起。如果我不退订,我就会留下一个幻影订阅,这会给我下一次带来麻烦。
有没有什么方法可以在发布订阅的线程终止后删除订阅?我注意到 Redis 实例有一个用于该终止线程的 mon 变量。示例 Ruby 代码为:
class Backend
include MInit
def initialize(app)
setup
@app = app
end
def run!(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
ws_thread = Thread.fork(env) do
credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)
ws.on :open do |event|
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
redis_thread = Thread.fork do
credis.subscribe(channel) do |on|
on.message do |message_channel, message|
sent = ws.send(message)
end
on.unsubscribe do |message_channel|
puts "Unsubscribe on channel:#{channel};"
end
end
end
end
ws.on :message do |event|
handoff(ws: ws, event: event)
end
ws.on :close do |event|
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
ws.on :error do |event|
ws.close
end
# Return async Rack response
ws.rack_response
end
end
else
@app.call(env)
end
private
def handoff(ws: nil, event: nil, source: nil, message: nil)
# processing
end
end
一旦我真正理解了这个问题,修复就相当简单了。 Redis线程其实还是存在的。但是,Redis 无论如何都会挂起,因为它正在等待线程获得控制权。为此,WS.close 代码需要通过在 WS.close 中使用 EM.next_tick 来放弃控制权,如下所示:
ws.on :close do |event|
EM.next_tick do
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
end
这是一个提供解决方法而非解决方案的长评论。
如果是我的申请,我会重新考虑设计。
为每个 websocket 客户端打开一个新的 Redis 连接和一个新线程是一个相当大的资源承诺。
澄清一下,每个与 Redis 的连接都需要一个 TCP/IP 套接字(这是一种有限的资源)和内存。对于保留的堆栈内存,线程每个线程的成本应该约为 2Mib...因此 1K Redis 连接和线程会产生大约 2Gib 的内存成本。
此外,Redis 服务器本身通常可以接受的连接数量有限(尽管这通常是价格问题而不是硬限制,因为它们旨在扩展).
重新调整设计应该非常简单,因此单个线程和连接为所有 websocket 客户端提供服务,这也将允许更轻松的 subscribe
/unsubscribe
管理。
这可以使用内部进程广播系统(例如 plezi.io 实现)或使用 Redis subscribe
/punsubscribe
命令来执行。