中间件中的线程 运行 使用旧版本的父实例变量
Thread running in Middleware is using old version of parent's instance variable
我已经使用 Heroku tutorial 实现了 websockets。
它适用于 Thin,但不适用于 Unicorn 和 Puma。
还有一个回显消息实现,它响应客户端的消息。它在每台服务器上都能正常工作,因此 websockets 实现没有问题。
Redis 设置也是正确的(它捕获所有消息,并执行 subscribe
块内的代码)。
现在如何运作:
服务器启动时,会初始化一个空的 @clients
数组。然后启动新线程,它正在侦听 Redis,旨在将该消息从@clients 数组发送给相应的用户。
在页面加载时,创建新的 websocket 连接,它存储在@clients 数组中。
如果我们从浏览器收到消息,我们会将其发送回与同一用户连接的所有客户端(该部分在 Thin 和 Puma 上均正常工作)。
如果我们收到来自 Redis 的消息,我们还会查找存储在@clients 数组中的所有用户连接。
这是奇怪的事情发生的地方:
如果运行 Thin,它会在@clients 数组中找到连接并将消息发送给它们。
如果 运行 Puma/Unicorn,@clients 数组始终为空,即使我们按该顺序尝试(没有页面重新加载或任何其他操作):
- 从浏览器发送消息 ->
@clients.length
为 1,消息已送达
- 通过Redis发送消息->
@clients.length
为0,消息丢失
- 从浏览器发送消息 ->
@clients.length
仍然是 1,消息已送达
有人可以澄清我错过了什么吗?
Puma服务器相关配置:
workers 1
threads_count = 1
threads threads_count, threads_count
相关中间件代码:
require 'faye/websocket'
class NotificationsBackend
def initialize(app)
@app = app
@clients = []
Thread.new do
redis_sub = Redis.new
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
# logging @clients.length from here will always return 0
# [..] retrieve user
send_message(user.id, { message: "ECHO: #{event.data}"} )
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
ws.on :open do |event|
# [..] retrieve current user
if user
# add ws connection to @clients array
else
# close ws
end
end
ws.on :message do |event|
# [..] retrieve current user
Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
end
ws.rack_response
else
@app.call(env)
end
end
def send_message user_id, message
# logging @clients.length here will always return correct result
# cs = all connections which belong to that client
cs.each { |c| c.send(message.to_json) }
end
end
Unicorn(显然还有 puma)都启动了一个 master 进程,然后 fork 了一个或多个 worker。 fork 复制(或至少呈现出复制的错觉 - 实际复制通常只在您写入页面时发生)您的整个进程,但只有调用 fork
的线程存在于新进程中。
很明显,您的应用程序在分叉之前正在初始化 - 通常这样做是为了让工作人员可以快速启动并从写入时复制内存节省中受益。因此,您的 redis 检查线程仅在主进程中 运行 而 @clients
正在子进程中被修改。
您可能可以通过推迟创建您的 redis 线程或禁用应用程序预加载来解决这个问题,但是您应该知道您的设置将阻止您扩展到单个工作进程之外(使用 puma 和线程像 jruby 这样友好的 JVM 将不再是一个约束)
以防万一有人遇到同样的问题,这里有两个我想出的解决方案:
1.禁用应用预加载(这是我想到的第一个解决方案)
只需从 puma.rb 文件中删除 preload_app!
。因此,所有线程都会有自己的 @clients
变量。它们将可以通过其他中间件方法访问(如 call
等)
缺点:您将失去应用预加载的所有好处。如果您只有 1 或 2 个 worker 和几个线程,那没问题,但如果您需要很多线程,那么最好预加载应用程序。所以我继续研究,这是另一个解决方案:
2。将线程初始化移出initialize
方法(我现在就是这么用的)
例如,我将其移至 call
方法,所以这就是中间件 class 代码的样子:
attr_accessor :subscriber
def call(env)
@subscriber ||= Thread.new do # if no subscriber present, init new one
redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
redis_sub.subscribe(CHANNEL) do |on|
on.message do |_, msg|
# parsing message code here, retrieve user
send_message(user.id, { message: "ECHO: #{event.data}"} )
end
end
end
# other code from method
end
两种解决方案都解决了相同的问题:Redis 侦听线程将为每个 Puma worker/thread 初始化,而不是为主进程(实际上不为请求服务)初始化。
我已经使用 Heroku tutorial 实现了 websockets。
它适用于 Thin,但不适用于 Unicorn 和 Puma。
还有一个回显消息实现,它响应客户端的消息。它在每台服务器上都能正常工作,因此 websockets 实现没有问题。
Redis 设置也是正确的(它捕获所有消息,并执行 subscribe
块内的代码)。
现在如何运作:
服务器启动时,会初始化一个空的 @clients
数组。然后启动新线程,它正在侦听 Redis,旨在将该消息从@clients 数组发送给相应的用户。
在页面加载时,创建新的 websocket 连接,它存储在@clients 数组中。
如果我们从浏览器收到消息,我们会将其发送回与同一用户连接的所有客户端(该部分在 Thin 和 Puma 上均正常工作)。
如果我们收到来自 Redis 的消息,我们还会查找存储在@clients 数组中的所有用户连接。 这是奇怪的事情发生的地方:
如果运行 Thin,它会在@clients 数组中找到连接并将消息发送给它们。
如果 运行 Puma/Unicorn,@clients 数组始终为空,即使我们按该顺序尝试(没有页面重新加载或任何其他操作):
- 从浏览器发送消息 ->
@clients.length
为 1,消息已送达 - 通过Redis发送消息->
@clients.length
为0,消息丢失 - 从浏览器发送消息 ->
@clients.length
仍然是 1,消息已送达
- 从浏览器发送消息 ->
有人可以澄清我错过了什么吗?
Puma服务器相关配置:
workers 1
threads_count = 1
threads threads_count, threads_count
相关中间件代码:
require 'faye/websocket'
class NotificationsBackend
def initialize(app)
@app = app
@clients = []
Thread.new do
redis_sub = Redis.new
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
# logging @clients.length from here will always return 0
# [..] retrieve user
send_message(user.id, { message: "ECHO: #{event.data}"} )
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
ws.on :open do |event|
# [..] retrieve current user
if user
# add ws connection to @clients array
else
# close ws
end
end
ws.on :message do |event|
# [..] retrieve current user
Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
end
ws.rack_response
else
@app.call(env)
end
end
def send_message user_id, message
# logging @clients.length here will always return correct result
# cs = all connections which belong to that client
cs.each { |c| c.send(message.to_json) }
end
end
Unicorn(显然还有 puma)都启动了一个 master 进程,然后 fork 了一个或多个 worker。 fork 复制(或至少呈现出复制的错觉 - 实际复制通常只在您写入页面时发生)您的整个进程,但只有调用 fork
的线程存在于新进程中。
很明显,您的应用程序在分叉之前正在初始化 - 通常这样做是为了让工作人员可以快速启动并从写入时复制内存节省中受益。因此,您的 redis 检查线程仅在主进程中 运行 而 @clients
正在子进程中被修改。
您可能可以通过推迟创建您的 redis 线程或禁用应用程序预加载来解决这个问题,但是您应该知道您的设置将阻止您扩展到单个工作进程之外(使用 puma 和线程像 jruby 这样友好的 JVM 将不再是一个约束)
以防万一有人遇到同样的问题,这里有两个我想出的解决方案:
1.禁用应用预加载(这是我想到的第一个解决方案)
只需从 puma.rb 文件中删除 preload_app!
。因此,所有线程都会有自己的 @clients
变量。它们将可以通过其他中间件方法访问(如 call
等)
缺点:您将失去应用预加载的所有好处。如果您只有 1 或 2 个 worker 和几个线程,那没问题,但如果您需要很多线程,那么最好预加载应用程序。所以我继续研究,这是另一个解决方案:
2。将线程初始化移出initialize
方法(我现在就是这么用的)
例如,我将其移至 call
方法,所以这就是中间件 class 代码的样子:
attr_accessor :subscriber
def call(env)
@subscriber ||= Thread.new do # if no subscriber present, init new one
redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
redis_sub.subscribe(CHANNEL) do |on|
on.message do |_, msg|
# parsing message code here, retrieve user
send_message(user.id, { message: "ECHO: #{event.data}"} )
end
end
end
# other code from method
end
两种解决方案都解决了相同的问题:Redis 侦听线程将为每个 Puma worker/thread 初始化,而不是为主进程(实际上不为请求服务)初始化。