RabbitMQ/bunny: 如果在线程中则不调用订阅块
RabbitMQ/bunny: subscribe block not called if within a thread
如果在线程中,我无法执行队列订阅块。
rubybunny/exchanges 中的示例按预期工作。但是,如果在线程中与消费者部分进行适配,则订阅者块似乎不会执行。
我尝试了几种简单的变体,包括设置共享变量标志,但都没有成功。
我错过了什么?
代码
#!/usr/bin/env ruby
require "bunny"
quit = false
consumer = Thread.new do
puts "consumer start"
cnx = Bunny.new
cnx.start
cn = cnx.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
loop {
sleep 1
break if quit
}
cnx.close
puts "consumer done"
end
connection = Bunny.new
connection.start
connection = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :routing_key => "europe.italy.roma").
publish("Paris update", :routing_key => "europe.france.paris")
sleep 5
connection.close
quit = true
consumer.join
实际产量
consumer start
consumer done
预期产出
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done
线程的订阅块没有执行,因为队列根本没有收到任何消息。详细来说,在这种情况下,队列最终是在消息发布后创建的。
这可以通过将消息切换到 :mandatory => true
并使用 Bunny::Exchange#on_return
:
来可视化
代码
#!/usr/bin/env ruby
require "bunny"
quit = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
正如我们所见,所有消息最终都被 return 编辑为 NO_ROUTE
。
在发布消息之前强制队列(和路由)存在的简单解决方案:
#!/usr/bin/env ruby
require "bunny"
quit = false
consumer_queued = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
consumer_queued = true
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
$stdout.flush
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
# ensure queue is ready
sleep 0.125 while !consumer_queued
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
$stdout.flush
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出(带有 return 通知)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
预期的消息已收到,其余的已returned。
如果在线程中,我无法执行队列订阅块。
rubybunny/exchanges 中的示例按预期工作。但是,如果在线程中与消费者部分进行适配,则订阅者块似乎不会执行。
我尝试了几种简单的变体,包括设置共享变量标志,但都没有成功。
我错过了什么?
代码#!/usr/bin/env ruby
require "bunny"
quit = false
consumer = Thread.new do
puts "consumer start"
cnx = Bunny.new
cnx.start
cn = cnx.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
loop {
sleep 1
break if quit
}
cnx.close
puts "consumer done"
end
connection = Bunny.new
connection.start
connection = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :routing_key => "europe.italy.roma").
publish("Paris update", :routing_key => "europe.france.paris")
sleep 5
connection.close
quit = true
consumer.join
实际产量
consumer start
consumer done
预期产出
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done
线程的订阅块没有执行,因为队列根本没有收到任何消息。详细来说,在这种情况下,队列最终是在消息发布后创建的。
这可以通过将消息切换到 :mandatory => true
并使用 Bunny::Exchange#on_return
:
#!/usr/bin/env ruby
require "bunny"
quit = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
正如我们所见,所有消息最终都被 return 编辑为 NO_ROUTE
。
在发布消息之前强制队列(和路由)存在的简单解决方案:
#!/usr/bin/env ruby
require "bunny"
quit = false
consumer_queued = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
consumer_queued = true
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
$stdout.flush
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
# ensure queue is ready
sleep 0.125 while !consumer_queued
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
$stdout.flush
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出(带有 return 通知)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
预期的消息已收到,其余的已returned。