Redis 订阅块中的 ActiveJob perform_later
ActiveJob perform_later from inside Redis subscription block
我想用rubyredis客户端订阅一个频道。订阅成功后,我想使用 perform_later
启动 ActiveJob(使用 Sidekiq)作业。然后,此作业将执行一些工作并生成将发布到 channel
Redis 频道的结果。
这会产生以下错误:
Processing by Api::LocationController#yyy as */*
Parameters: {"zip_code"=>"503400"}
[ActiveJob] Failed enqueuing XXXJob to Sidekiq(default): NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=#<Redis::Connection::TCPSocket:fd 26>>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>)
Completed 500 Internal Server Error in 26ms (Allocations: 2374)
NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=nil>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>):
app/controllers/api/location_controller.rb:10:in `block (3 levels) in yyy'
app/controllers/api/location_controller.rb:8:in `block in yyy'
app/controllers/api/location_controller.rb:5:in `yyy'
yyy
函数是根据 GET
请求调用的控制器的一部分:
def yyy
REDIS_POOL.with { |r|
data = r.get("key")
if data.nil?
r.subscribe("channel") { |on|
on.subscribe {
XXXJob.perform_later(params)
}
on.message { |_, message|
puts message
render json: message
r.unsubscribe("channel")
}
}
else
render json: data
end
}
end
在我看来,Redis subscribe
块丢失了一些上下文,这对 perform_later
函数很重要。如果我将 XXXJob.perform_later(params)
调用移到 r.subscribe("channel")
调用上方并因此移到块之外,则相同的代码会起作用。但是,我想确保作业仅在成功订阅 Redis 频道后才开始。有什么办法可以实现吗?
很难。它不会工作,因为 Sidekiq 的默认连接池总是重用线程的当前连接,它正在处理订阅调用并且该调用是不可重入的。需要手动创建单独的连接池,直接使用
ANOTHER_POOL = ConnectionPool.new { Redis.new }
on.subscribe do
Sidekiq::Client.via(POOL) do
SomeWorker.perform_async(1,2,3)
SomeOtherWorker.perform_async(1,2,3)
end
end
我想用rubyredis客户端订阅一个频道。订阅成功后,我想使用 perform_later
启动 ActiveJob(使用 Sidekiq)作业。然后,此作业将执行一些工作并生成将发布到 channel
Redis 频道的结果。
这会产生以下错误:
Processing by Api::LocationController#yyy as */*
Parameters: {"zip_code"=>"503400"}
[ActiveJob] Failed enqueuing XXXJob to Sidekiq(default): NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=#<Redis::Connection::TCPSocket:fd 26>>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>)
Completed 500 Internal Server Error in 26ms (Allocations: 2374)
NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=nil>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>):
app/controllers/api/location_controller.rb:10:in `block (3 levels) in yyy'
app/controllers/api/location_controller.rb:8:in `block in yyy'
app/controllers/api/location_controller.rb:5:in `yyy'
yyy
函数是根据 GET
请求调用的控制器的一部分:
def yyy
REDIS_POOL.with { |r|
data = r.get("key")
if data.nil?
r.subscribe("channel") { |on|
on.subscribe {
XXXJob.perform_later(params)
}
on.message { |_, message|
puts message
render json: message
r.unsubscribe("channel")
}
}
else
render json: data
end
}
end
在我看来,Redis subscribe
块丢失了一些上下文,这对 perform_later
函数很重要。如果我将 XXXJob.perform_later(params)
调用移到 r.subscribe("channel")
调用上方并因此移到块之外,则相同的代码会起作用。但是,我想确保作业仅在成功订阅 Redis 频道后才开始。有什么办法可以实现吗?
很难。它不会工作,因为 Sidekiq 的默认连接池总是重用线程的当前连接,它正在处理订阅调用并且该调用是不可重入的。需要手动创建单独的连接池,直接使用
ANOTHER_POOL = ConnectionPool.new { Redis.new }
on.subscribe do
Sidekiq::Client.via(POOL) do
SomeWorker.perform_async(1,2,3)
SomeOtherWorker.perform_async(1,2,3)
end
end