在 rails 线程中访问变量
Accessing a variable within a rails thread
我正在构建一个用于基于 Web 的幻灯片放映的应用程序,其中一个 'master' 用户可以在幻灯片之间移动,每个人的浏览器都可以跟随。为此,我使用 websockets 和 Redis 作为全球通道来发送消息。每个连接的客户端都将信息存储在数组 @clients
中。
然后我有一个单独的线程用于订阅 Redis 通道,其中定义了一个 'on.message' 块,它应该向 @clients
数组中的每个人发送一条消息,但是该块中的数组是空的(模块中的其他任何地方都不为空)。
几乎遵循这个例子:
https://devcenter.heroku.com/articles/ruby-websockets
相关代码,在自定义中间件中class:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts @clients.count
### prints '0,' no clients receive msg
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts @clients.count
### prints actual number of clients
end
ws.on :message do |event|
$redis.publish(CHANNEL, event.data)
end
ws.on :close do |event|
@clients.delete(ws)
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
在新线程中访问时 @clients
数组是否为空,因为实例变量不在线程间共享?如果是这样,我如何跨线程共享变量?
我也尝试过使用 $clients(全局变量,应该可以跨线程访问),但没有用。
@client应该是所有线程共享的,你确定client不是从数组中被误删除的吗?尝试将 "client deleted" 放入 ws.on :close 块中并进行测试。
您也可以尝试使用互斥锁,其中以这种方式使用 @client 变量:
http://ruby-doc.org/core-2.2.0/Mutex.html
最后更新编辑:显示工作代码。除调试代码外,主模块未修改。注意:我确实遇到了我已经注意到的关于需要在终止前取消订阅的问题。
代码看起来正确。我想看看你是如何实例化它的。
在 config/application.rb 中,你可能至少有这样的东西:
require 'ws_communication'
config.middleware.use WsCommunication
然后,在您的 JavaScript 客户端中,您应该有这样的东西:
var ws = new WebSocket(uri);
您是否实例化了另一个 WsCommunication 实例?这会将 @clients 设置为一个空数组,并可能表现出您的症状。这样的事情是不正确的:
var ws = new WsCommunication;
如果您能向客户展示这将对我们有所帮助,如果 post 没有帮助,也许 config/application.rb。
顺便说一下,我同意 @clients 应该在任何更新时受到互斥锁保护的评论,如果不读取也是如此。它是一个动态结构,在事件驱动系统中可以随时改变。 redis-mutex 是个不错的选择。 (希望 link 是正确的,因为 Github 目前似乎对所有内容都抛出了 500 个错误。)
您可能还会注意到,$redis.publish returns 是接收消息的客户端数量的整数值。
最后,您可能会发现您需要确保在终止之前取消订阅您的频道。我遇到过这样的情况,我最终会多次发送每条消息,甚至很多次,因为之前订阅的同一个频道没有被清理。由于您是在线程内订阅频道,因此您需要在同一个线程内取消订阅,否则进程将 "hang" 等待正确的线程神奇地出现。我通过设置 "unsubscribe" 标志然后发送消息来处理这种情况。然后,在 on.message 块中,我测试取消订阅标志并在那里发出取消订阅。
您提供的模块,仅进行了少量调试修改:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
我提供的测试用户代码:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
我提供的测试发布者代码。发布者和订阅者可以很容易地结合起来,因为这些只是测试:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
在机架中间件层运行所有这些的示例config.ru:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
这是主要的。我把它从我的 运行 版本中删除了,所以如果你使用它可能需要调整:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end
我正在构建一个用于基于 Web 的幻灯片放映的应用程序,其中一个 'master' 用户可以在幻灯片之间移动,每个人的浏览器都可以跟随。为此,我使用 websockets 和 Redis 作为全球通道来发送消息。每个连接的客户端都将信息存储在数组 @clients
中。
然后我有一个单独的线程用于订阅 Redis 通道,其中定义了一个 'on.message' 块,它应该向 @clients
数组中的每个人发送一条消息,但是该块中的数组是空的(模块中的其他任何地方都不为空)。
几乎遵循这个例子: https://devcenter.heroku.com/articles/ruby-websockets
相关代码,在自定义中间件中class:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts @clients.count
### prints '0,' no clients receive msg
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts @clients.count
### prints actual number of clients
end
ws.on :message do |event|
$redis.publish(CHANNEL, event.data)
end
ws.on :close do |event|
@clients.delete(ws)
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
在新线程中访问时 @clients
数组是否为空,因为实例变量不在线程间共享?如果是这样,我如何跨线程共享变量?
我也尝试过使用 $clients(全局变量,应该可以跨线程访问),但没有用。
@client应该是所有线程共享的,你确定client不是从数组中被误删除的吗?尝试将 "client deleted" 放入 ws.on :close 块中并进行测试。 您也可以尝试使用互斥锁,其中以这种方式使用 @client 变量: http://ruby-doc.org/core-2.2.0/Mutex.html
最后更新编辑:显示工作代码。除调试代码外,主模块未修改。注意:我确实遇到了我已经注意到的关于需要在终止前取消订阅的问题。
代码看起来正确。我想看看你是如何实例化它的。
在 config/application.rb 中,你可能至少有这样的东西:
require 'ws_communication'
config.middleware.use WsCommunication
然后,在您的 JavaScript 客户端中,您应该有这样的东西:
var ws = new WebSocket(uri);
您是否实例化了另一个 WsCommunication 实例?这会将 @clients 设置为一个空数组,并可能表现出您的症状。这样的事情是不正确的:
var ws = new WsCommunication;
如果您能向客户展示这将对我们有所帮助,如果 post 没有帮助,也许 config/application.rb。
顺便说一下,我同意 @clients 应该在任何更新时受到互斥锁保护的评论,如果不读取也是如此。它是一个动态结构,在事件驱动系统中可以随时改变。 redis-mutex 是个不错的选择。 (希望 link 是正确的,因为 Github 目前似乎对所有内容都抛出了 500 个错误。)
您可能还会注意到,$redis.publish returns 是接收消息的客户端数量的整数值。
最后,您可能会发现您需要确保在终止之前取消订阅您的频道。我遇到过这样的情况,我最终会多次发送每条消息,甚至很多次,因为之前订阅的同一个频道没有被清理。由于您是在线程内订阅频道,因此您需要在同一个线程内取消订阅,否则进程将 "hang" 等待正确的线程神奇地出现。我通过设置 "unsubscribe" 标志然后发送消息来处理这种情况。然后,在 on.message 块中,我测试取消订阅标志并在那里发出取消订阅。
您提供的模块,仅进行了少量调试修改:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
我提供的测试用户代码:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
我提供的测试发布者代码。发布者和订阅者可以很容易地结合起来,因为这些只是测试:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
在机架中间件层运行所有这些的示例config.ru:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
这是主要的。我把它从我的 运行 版本中删除了,所以如果你使用它可能需要调整:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end