中间件中的线程 运行 使用旧版本的父实例变量

Thread running in Middleware is using old version of parent's instance variable

我已经使用 Heroku tutorial 实现了 websockets。

它适用于 Thin,但不适用于 Unicorn 和 Puma。

还有一个回显消息实现,它响应客户端的消息。它在每台服务器上都能正常工作,因此 websockets 实现没有问题。

Redis 设置也是正确的(它捕获所有消息,并执行 subscribe 块内的代码)。

现在如何运作:

服务器启动时,会初始化一个空的 @clients 数组。然后启动新线程,它正在侦听 Redis,旨在将该消息从@clients 数组发送给相应的用户。

在页面加载时,创建新的 websocket 连接,它存储在@clients 数组中。

如果我们从浏览器收到消息,我们会将其发送回与同一用户连接的所有客户端(该部分在 Thin 和 Puma 上均正常工作)。

如果我们收到来自 Redis 的消息,我们还会查找存储在@clients 数组中的所有用户连接。 这是奇怪的事情发生的地方:

有人可以澄清我错过了什么吗?

Puma服务器相关配置:

workers 1
threads_count = 1
threads threads_count, threads_count

相关中间件代码:

require 'faye/websocket'

class NotificationsBackend

  def initialize(app)
    @app     = app
    @clients = []
    Thread.new do
      redis_sub = Redis.new
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          # logging @clients.length from here will always return 0
          # [..] retrieve user
          send_message(user.id, { message: "ECHO: #{event.data}"} )
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      ws.on :open do |event|
        # [..] retrieve current user
        if user
          # add ws connection to @clients array
        else
          # close ws
        end
      end

      ws.on :message do |event|
        # [..] retrieve current user
        Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
  def send_message user_id, message
    # logging @clients.length here will always return correct result
    # cs = all connections which belong to that client
    cs.each { |c| c.send(message.to_json) }
  end
end

Unicorn(显然还有 puma)都启动了一个 master 进程,然后 fork 了一个或多个 worker。 fork 复制(或至少呈现出复制的错觉 - 实际复制通常只在您写入页面时发生)您的整个进程,但只有调用 fork 的线程存在于新进程中。

很明显,您的应用程序在分叉之前正在初始化 - 通常这样做是为了让工作人员可以快速启动并从写入时复制内存节省中受益。因此,您的 redis 检查线程仅在主进程中 运行 而 @clients 正在子进程中被修改。

您可能可以通过推迟创建您的 redis 线程或禁用应用程序预加载来解决这个问题,但是您应该知道您的设置将阻止您扩展到单个工作进程之外(使用 puma 和线程像 jruby 这样友好的 JVM 将不再是一个约束)

以防万一有人遇到同样的问题,这里有两个我想出的解决方案:

1.禁用应用预加载(这是我想到的第一个解决方案)

只需从 puma.rb 文件中删除 preload_app!。因此,所有线程都会有自己的 @clients 变量。它们将可以通过其他中间件方法访问(如 call 等)

缺点:您将失去应用预加载的所有好处。如果您只有 1 或 2 个 worker 和几个线程,那没问题,但如果您需要很多线程,那么最好预加载应用程序。所以我继续研究,这是另一个解决方案:

2。将线程初始化移出initialize方法(我现在就是这么用的)

例如,我将其移至 call 方法,所以这就是中间件 class 代码的样子:

attr_accessor :subscriber

def call(env)
  @subscriber ||= Thread.new do # if no subscriber present, init new one
    redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
    redis_sub.subscribe(CHANNEL) do |on|
      on.message do |_, msg|
        # parsing message code here, retrieve user
        send_message(user.id, { message: "ECHO: #{event.data}"} )
      end
    end
  end
  # other code from method
end

两种解决方案都解决了相同的问题:Redis 侦听线程将为每个 Puma worker/thread 初始化,而不是为主进程(实际上不为请求服务)初始化。