RabbitMQ/bunny: 如果在线程中则不调用订阅块

RabbitMQ/bunny: subscribe block not called if within a thread

如果在线程中,我无法执行队列订阅块。

rubybunny/exchanges 中的示例按预期工作。但是,如果在线程中与消费者部分进行适配,则订阅者块似乎不会执行。

我尝试了几种简单的变体,包括设置共享变量标志,但都没有成功。

我错过了什么?

代码
#!/usr/bin/env ruby
require "bunny"

quit = false

consumer = Thread.new do
  puts "consumer start"

  cnx = Bunny.new
  cnx.start
  cn  = cnx.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  loop {
    sleep 1
    break if quit
  }

  cnx.close
  puts "consumer done"
end

connection = Bunny.new
connection.start
connection  = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :routing_key => "europe.italy.roma").
  publish("Paris update",            :routing_key => "europe.france.paris")

sleep 5
connection.close

quit = true
consumer.join
实际产量
consumer start
consumer done
预期产出
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done

线程的订阅块没有执行,因为队列根本没有收到任何消息。详细来说,在这种情况下,队列最终是在消息发布后创建的。

这可以通过将消息切换到 :mandatory => true 并使用 Bunny::Exchange#on_return:

来可视化 代码
#!/usr/bin/env ruby
require "bunny"

quit = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close
输出
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

正如我们所见,所有消息最终都被 return 编辑为 NO_ROUTE

在发布消息之前强制队列(和路由)存在的简单解决方案:

#!/usr/bin/env ruby
require "bunny"

quit = false
consumer_queued = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  consumer_queued = true
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
    $stdout.flush
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

# ensure queue is ready
sleep 0.125  while !consumer_queued

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
  $stdout.flush
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close
输出(带有 return 通知)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

预期的消息已收到,其余的已returned。