发送给 EventMachine pubsub 订阅者的重复数据
Duplicated data sent to EventMachine pubsub subscribers
堆栈:Ruby 2.3.1,机架,薄
简单的 websocket 服务器:
require 'redis'
require 'em-hiredis'
require 'faye/websocket'
require 'json'
ws_channel = {}
App = lambda do |env|
$redis ||= EM::Hiredis.connect('redis://127.0.0.1:6379')
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil,
headers: {'Access-Control-Allow-Origin' => '*'},
ping: 15
)
ws.on :open do |event|
puts 'client connected'
query_string = event.current_target.env['REQUEST_PATH'].gsub(/[^a-z0-9\-_\/]/, '')
ws_channel[query_string] ||= EM::Channel.new
pubsub = $redis.pubsub
puts "subscribing to ws channel: ws:#{query_string}"
sid = ws_channel[query_string].subscribe do |msg|
puts "WS -> ws:#{query_string}/ #{sid} #{ws_channel[query_string]}"
ws.send msg
end
puts "subscribing to redis: #{query_string}"
pubsub.subscribe(query_string) do |msg|
puts "REDIS -> ws:#{query_string}/"
$redis.setex(query_string, 60, msg)
ws_channel[query_string].push msg
end
EventMachine.add_periodic_timer(5) do
ws.send ({ :ts => Time.now.to_i}.to_json) if ws
end
ws.on :close do |event|
puts "client ##{query_string} disconnected"
pubsub.unsubscribe(query_string) if pubsub
ws_channel[query_string].unsubscribe(sid) if ws_channel[query_string]
ws = nil
pubsub = nil
end
end
ws.rack_response
end
end
config.ru:
require 'rubygems'
require 'bundler/setup'
require 'logger'
require File.expand_path('../app', __FILE__)
Faye::WebSocket.load_adapter('thin')
run App
启动服务器:
bundle exec thin -p 9292 -R config.ru start
发行条件:
- 从同一个 IP 建立到同一个 WS 通道的多个连接(多个浏览器选项卡在同一台计算机上打开了同一个游戏)。
- 来自 WS 服务器的单个数据推送导致数据到达每个订阅者的次数与订阅者的次数一样多。
- 如果刷新其中一个选项卡(与 WS 服务器的连接关闭并重新打开),后续数据推送不会导致数据重复。
- 建立新连接后,#2 中的场景再次出现。
我对此的修复是 unsubscribe/resubscribe 在连接打开时。所以:
pubsub = $redis.pubsub
pubsub.unsubscribe(query_string) if pubsub
pubsub = $redis.pubsub
但这引入了另一个问题:当一个选项卡关闭时,数据停止到达其他选项卡大约 30 秒。 WS 连接永远不会关闭我可以在 JS 控制台中看到 5 秒的 ping。
redis-cli $> PUBSUB NUMSUB <channel>
- 无论有多少订阅者订阅了该频道,这只显示了一个频道订阅
目标功能:
- 多个客户端(订阅者)从同一个 IP 连接到同一个频道。
- 每个订阅者收到一份WS服务器推送的数据
- 客户端disconnection/new客户端连接不会导致其他客户端的任何服务中断。
为每个 WS 连接创建一个唯一的 EM 频道并在 ws.close 上以特定价格退订并且似乎完成了工作:
require 'redis'
require 'em-hiredis'
require 'faye/websocket'
require 'json'
App = lambda do |env|
$redis ||= EM::Hiredis.connect('redis://127.0.0.1:6379')
$pubsub ||= $redis.pubsub
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil,
headers: {'Access-Control-Allow-Origin' => '*'},
ping: 15
)
ws.on :open do |event|
puts 'client connected'
query_string = event.current_target.env['REQUEST_PATH'].gsub(/[^a-z0-9\-_\/]/, '')
channel = EM::Channel.new
puts "subscribing to ws channel: ws:#{query_string}"
sid = channel.subscribe do |msg|
puts "WS -> ws:#{query_string}/ #{sid} #{channel}"
ws.send msg
end
puts "subscribing to redis: #{query_string}"
subs = {}; r_callback = rand(Time.now.to_i)
subs[r_callback] = Proc.new { |msg|
puts "REDIS -> ws:#{query_string}/"
$redis.setex(query_string, 60, msg)
channel.push msg
}
$pubsub.subscribe(query_string, subs[r_callback])
#puts $pubsub.inspect
ws.on :close do |event|
puts "client ##{query_string} disconnected"
$pubsub.unsubscribe_proc(query_string, subs[r_callback]) if $pubsub
puts "Unsubscribed proc: #{subs[r_callback]}"
channel.unsubscribe(sid) if channel
ws = nil
end
end
ws.rack_response
end
end
编辑
我认为 em-hiredis
实际上为 Redis 订阅回收相同的连接和线程...我不确定是不是这种情况,但如果是这样,那么下面的答案可能有点矫枉过正。
我仍然建议遵循建议的设计,因为它仍会节省大量资源。
原:
我不确定我是否理解目标功能,但我相信代码中反映了设计问题。我也相信解决这个问题会产生正确的行为。
虽然这被 EM
层抽象掉了,但是每个 Redis 订阅客户端需要 1 个新线程和 1 个新的 TCP/IP 连接(到 Redis 服务器)。自从我阅读代码库以来,事情可能发生了变化,但不知何故我对此表示怀疑...
...新(主要是睡眠)线程不像新进程那么昂贵(我认为它们每个线程的成本略高于 1Mib,主要用于堆栈分配),TCP/IP 连接是有限的。
即使这不是问题(我假设 Redis 连接器的更新版本会解决这个问题),从 TCP/IP 连接多个(? ) 应用程序已拥有的数据副本。
更好的设计应该是:
为应用程序中的所有事件创建一个全局 Redis 通道。此全局通道可用于跨进程发布应用程序范围的广播。
为每个应用程序进程创建一个私有 Redis 通道。此通道可用于特定 Websocket 客户端之间的直接通信(使用它们的主机进程)。
为每个 Websocket 客户端分配一个唯一的进程 ID(可以是一个简单的分子)。
与进程特定的 UUID 通道一起,此本地 ID 将为每个 Websocket 连接提供唯一的全局标识符(进程 UUID 是 "owns" 连接的进程的通道)。
创建一个线程(每个应用程序进程)侦听两个通道(全局和专用通道)和 "dispatches" 消息到进程中的最终目的地。
"dispatcher" 可能应该忽略来自它自己的进程的任何消息,以防止双重处理(这允许单进程应用程序避免使用 Redis)。
此设计是我在 the Plezi framework 中实现的设计,它的资源效率使 Plezi 可以同时为大量客户端提供服务。
这是一个使用 Plezi 的简单示例,因为我对 Faye/EM API 不是很好。使用 iodine
(或 rackup
)命令将此示例保存为 config.ru
文件和 shell 中的 运行:
require 'plezi'
require 'redis'
# uncomment and update URL
# ENV['PL_REDIS_URL'] = "redis://my.host:6389/0"
class TimeAndEcho
def index
"return the client page using `:render` or as a simple string".freeze
end
def on_open
# `id` is inherited from Plezi using a Controller module mixin
puts "New connection with UUID (process+client): #{id}"
end
def on_message data
# The data is sent to everyone EXCEPT self:
broadcast :handle_message, data
write "sent"
end
# broadcasting invokes a non-inherited method.. we will simply write the info
def handle_message data
# write is "inherited" using a module mixin when the class is used as a Websocket controller.
write data
end
end
Plezi.route '/', TimeAndEcho
# Idione's timer is in milliseconds.
Iodine.run_every(5000) do
TimeAndEcho. broadcast(:handle_message, { :ts => Time.now.to_i}.to_json)
end
# Set the Rack application
run Plezi.app
在这个例子中,上面详述的调度逻辑是由 Plezi 框架使用 MessageDispatch
module that sends and received messages using Redis and a pub/sub thread.
实现的
请注意,运行此示例需要 POSIX 系统(Linux / macOS / BSD)和 iodine
服务器。
iodine
实现了提议的 Websocket-Rack specification draft 并且 Plezi 出于性能原因使用此原生 Websocket 设计。
堆栈:Ruby 2.3.1,机架,薄
简单的 websocket 服务器:
require 'redis'
require 'em-hiredis'
require 'faye/websocket'
require 'json'
ws_channel = {}
App = lambda do |env|
$redis ||= EM::Hiredis.connect('redis://127.0.0.1:6379')
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil,
headers: {'Access-Control-Allow-Origin' => '*'},
ping: 15
)
ws.on :open do |event|
puts 'client connected'
query_string = event.current_target.env['REQUEST_PATH'].gsub(/[^a-z0-9\-_\/]/, '')
ws_channel[query_string] ||= EM::Channel.new
pubsub = $redis.pubsub
puts "subscribing to ws channel: ws:#{query_string}"
sid = ws_channel[query_string].subscribe do |msg|
puts "WS -> ws:#{query_string}/ #{sid} #{ws_channel[query_string]}"
ws.send msg
end
puts "subscribing to redis: #{query_string}"
pubsub.subscribe(query_string) do |msg|
puts "REDIS -> ws:#{query_string}/"
$redis.setex(query_string, 60, msg)
ws_channel[query_string].push msg
end
EventMachine.add_periodic_timer(5) do
ws.send ({ :ts => Time.now.to_i}.to_json) if ws
end
ws.on :close do |event|
puts "client ##{query_string} disconnected"
pubsub.unsubscribe(query_string) if pubsub
ws_channel[query_string].unsubscribe(sid) if ws_channel[query_string]
ws = nil
pubsub = nil
end
end
ws.rack_response
end
end
config.ru:
require 'rubygems'
require 'bundler/setup'
require 'logger'
require File.expand_path('../app', __FILE__)
Faye::WebSocket.load_adapter('thin')
run App
启动服务器:
bundle exec thin -p 9292 -R config.ru start
发行条件:
- 从同一个 IP 建立到同一个 WS 通道的多个连接(多个浏览器选项卡在同一台计算机上打开了同一个游戏)。
- 来自 WS 服务器的单个数据推送导致数据到达每个订阅者的次数与订阅者的次数一样多。
- 如果刷新其中一个选项卡(与 WS 服务器的连接关闭并重新打开),后续数据推送不会导致数据重复。
- 建立新连接后,#2 中的场景再次出现。
我对此的修复是 unsubscribe/resubscribe 在连接打开时。所以:
pubsub = $redis.pubsub
pubsub.unsubscribe(query_string) if pubsub
pubsub = $redis.pubsub
但这引入了另一个问题:当一个选项卡关闭时,数据停止到达其他选项卡大约 30 秒。 WS 连接永远不会关闭我可以在 JS 控制台中看到 5 秒的 ping。
redis-cli $> PUBSUB NUMSUB <channel>
- 无论有多少订阅者订阅了该频道,这只显示了一个频道订阅
目标功能:
- 多个客户端(订阅者)从同一个 IP 连接到同一个频道。
- 每个订阅者收到一份WS服务器推送的数据
- 客户端disconnection/new客户端连接不会导致其他客户端的任何服务中断。
为每个 WS 连接创建一个唯一的 EM 频道并在 ws.close 上以特定价格退订并且似乎完成了工作:
require 'redis'
require 'em-hiredis'
require 'faye/websocket'
require 'json'
App = lambda do |env|
$redis ||= EM::Hiredis.connect('redis://127.0.0.1:6379')
$pubsub ||= $redis.pubsub
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil,
headers: {'Access-Control-Allow-Origin' => '*'},
ping: 15
)
ws.on :open do |event|
puts 'client connected'
query_string = event.current_target.env['REQUEST_PATH'].gsub(/[^a-z0-9\-_\/]/, '')
channel = EM::Channel.new
puts "subscribing to ws channel: ws:#{query_string}"
sid = channel.subscribe do |msg|
puts "WS -> ws:#{query_string}/ #{sid} #{channel}"
ws.send msg
end
puts "subscribing to redis: #{query_string}"
subs = {}; r_callback = rand(Time.now.to_i)
subs[r_callback] = Proc.new { |msg|
puts "REDIS -> ws:#{query_string}/"
$redis.setex(query_string, 60, msg)
channel.push msg
}
$pubsub.subscribe(query_string, subs[r_callback])
#puts $pubsub.inspect
ws.on :close do |event|
puts "client ##{query_string} disconnected"
$pubsub.unsubscribe_proc(query_string, subs[r_callback]) if $pubsub
puts "Unsubscribed proc: #{subs[r_callback]}"
channel.unsubscribe(sid) if channel
ws = nil
end
end
ws.rack_response
end
end
编辑
我认为 em-hiredis
实际上为 Redis 订阅回收相同的连接和线程...我不确定是不是这种情况,但如果是这样,那么下面的答案可能有点矫枉过正。
我仍然建议遵循建议的设计,因为它仍会节省大量资源。
原:
我不确定我是否理解目标功能,但我相信代码中反映了设计问题。我也相信解决这个问题会产生正确的行为。
虽然这被 EM
层抽象掉了,但是每个 Redis 订阅客户端需要 1 个新线程和 1 个新的 TCP/IP 连接(到 Redis 服务器)。自从我阅读代码库以来,事情可能发生了变化,但不知何故我对此表示怀疑...
...新(主要是睡眠)线程不像新进程那么昂贵(我认为它们每个线程的成本略高于 1Mib,主要用于堆栈分配),TCP/IP 连接是有限的。
即使这不是问题(我假设 Redis 连接器的更新版本会解决这个问题),从 TCP/IP 连接多个(? ) 应用程序已拥有的数据副本。
更好的设计应该是:
为应用程序中的所有事件创建一个全局 Redis 通道。此全局通道可用于跨进程发布应用程序范围的广播。
为每个应用程序进程创建一个私有 Redis 通道。此通道可用于特定 Websocket 客户端之间的直接通信(使用它们的主机进程)。
为每个 Websocket 客户端分配一个唯一的进程 ID(可以是一个简单的分子)。
与进程特定的 UUID 通道一起,此本地 ID 将为每个 Websocket 连接提供唯一的全局标识符(进程 UUID 是 "owns" 连接的进程的通道)。
创建一个线程(每个应用程序进程)侦听两个通道(全局和专用通道)和 "dispatches" 消息到进程中的最终目的地。
"dispatcher" 可能应该忽略来自它自己的进程的任何消息,以防止双重处理(这允许单进程应用程序避免使用 Redis)。
此设计是我在 the Plezi framework 中实现的设计,它的资源效率使 Plezi 可以同时为大量客户端提供服务。
这是一个使用 Plezi 的简单示例,因为我对 Faye/EM API 不是很好。使用 iodine
(或 rackup
)命令将此示例保存为 config.ru
文件和 shell 中的 运行:
require 'plezi'
require 'redis'
# uncomment and update URL
# ENV['PL_REDIS_URL'] = "redis://my.host:6389/0"
class TimeAndEcho
def index
"return the client page using `:render` or as a simple string".freeze
end
def on_open
# `id` is inherited from Plezi using a Controller module mixin
puts "New connection with UUID (process+client): #{id}"
end
def on_message data
# The data is sent to everyone EXCEPT self:
broadcast :handle_message, data
write "sent"
end
# broadcasting invokes a non-inherited method.. we will simply write the info
def handle_message data
# write is "inherited" using a module mixin when the class is used as a Websocket controller.
write data
end
end
Plezi.route '/', TimeAndEcho
# Idione's timer is in milliseconds.
Iodine.run_every(5000) do
TimeAndEcho. broadcast(:handle_message, { :ts => Time.now.to_i}.to_json)
end
# Set the Rack application
run Plezi.app
在这个例子中,上面详述的调度逻辑是由 Plezi 框架使用 MessageDispatch
module that sends and received messages using Redis and a pub/sub thread.
请注意,运行此示例需要 POSIX 系统(Linux / macOS / BSD)和 iodine
服务器。
iodine
实现了提议的 Websocket-Rack specification draft 并且 Plezi 出于性能原因使用此原生 Websocket 设计。