rails 的 AMQP gem 正在重新排队数百条已成功处理的消息

AMQP gem for rails is requeuing hundreds of successfully processed messages

为什么当我的队列中有很多消息 (1200) 时,即使我的代码成功处理了消息并且 "acks" 我的消息仍被重新排队?

我该如何解决这个问题?

..

我有一个应用程序使用 rails amqp gem 来利用 RabbitMQ。我们将消息放在队列中,其中包含有关需要发送的电子邮件的信息,订阅者将其取下并发送。

有时数百条消息会快速连续地放入队列。

我们使用 acknowledgements 来确保消息不会丢失。

它工作得很好,直到最近我发现队列中有 1200 条消息并且没有被消费。

那么为什么我的消费者没有消费它们?

查看日志我发现是的,它已经消耗了它们并且发送了电子邮件。我重新启动了消费者,它重新消费了它们,这意味着我们向用户发送了多封相同的电子邮件。哎呀!但是我通过观察 RabbitMQ UI 注意到的是,当我重新启动消费者时,它立即从队列中取出了所有 1200 条消息。几分钟后,这些消息被重新排队,即使我的消费者仍在浏览它们并发送电子邮件。在我们的代码中,消费者会在每封电子邮件发送后确认消息(消息已处理)。

所以我对正在发生的事情的最佳猜测是,当队列中有很多消息时,消费者将它们全部取出,但不会单独确认每一个,而是等到所有消息都被处理完做一个质量确认。由于这需要很长时间,10 分钟,RabbitMQ 端发生了一些事情,它说,嘿,这花费的时间太长了,让我们重新排队所有这些消息,即使我的消费者仍在成功处理它们。

我环顾四周,发现了一个叫做心跳的东西,但我找不到任何关于它是什么以及如何使用它的明确解释,如果我需要使用它的话。但听起来它可能与队列和消费者之间的通信有关,并且可能是在处理这些消息时不让所有这些消息重新排队的关键。

我尝试的另一件事是使用预取:1. 描述 here 。虽然这似乎不合适,因为我只有一个消费者。但这听起来很有希望,因为它看起来好像可以强制确认gem所有消息。

考虑到我们可以快速连续地将数百条消息放入队列,我是否应该考虑多个消费者?

这是我订阅队列的 rake 任务

task :subscribe_basic => :environment do |task_name|
  begin # make sure any exception is logged
    log = Rails.logger
    routing_key = "send_letter"
    tcp_connection_settings =
        {:host=>"localhost",
         :port=>5672,
         :vhost=>"dev_vhost",
         :user=>"dev_user",
         :pass=>"abc123",
         :timeout=>0.3,
         :ssl=>false,
         :on_tcp_connection_loss=>
             handle_conn_loss,
         :logging=>true}

    begin
      ::AMQP.start(tcp_connection_settings) do |connection|
        channel = ::AMQP::Channel.new(connection, :prefetch => 1)
        binding.pry
        channel.auto_recovery = true
        cons = SendLetterConsumer.new channel, log

        queue = channel.queue(routing_key, exclusive: false, durable: true)

        consumer1 = AMQP::Consumer.new(channel, queue, nil, exclusive = false, no_ack = false)
        consumer1.consume.on_delivery(&cons.method(:handle_message))
        log.info "subscribed to queue #{routing_key}, config_key #{config_key} (#{Process.pid})"

        Signal.trap 'INT' do # kill -s INT <pid> , kill -2 <pid>,  Ctrl+C
          log.info "#{task_name} stopping(#{Process.pid})..."
          channel.close { EventMachine.stop } # otherwise segfault
        end
      end
    rescue StandardError => ex
      # 2015-03-20 02:52:49 UTC MQ raised EventMachine::ConnectionError: unable to resolve server address
      log.error "MQ raised #{ex.class.name}: #{ex.message} Backtrace: #{ex.backtrace}"
    end
  rescue Exception => ex
    log.error "#{ex.class.name}: #{ex.message} -- #{ex.backtrace.inspect}"
    raise ex
  end

end

这是我们用来处理消息的消费者代码(在上面的代码中调用:consumer1.consume.on_delivery(&cons.method(:handle_message))):

def handle_message(metadata, payload)
  logger.info "*** SendLetterConsumer#handle_message start #{Time.now}"
  logger.info payload
  begin
    # {course_app: aCourseApplication, errors:[]}
    # {course_app: aFaultyCourseApplication, errors: ['error1', 'error2']}
    msg = JSON.parse(payload)
    ca = CourseApplication.find(msg['course_application_id'])
    am = AutomatedMessage.find(msg['automated_message_id'])
    user_name = msg['user_name']
    if am.present?
      raise "Cannot send a letter for Automated message with id #{am.id} because it does not have an associated message template" if am.message_template.nil?
      logger.info "attempt to send letter for Automated Message with id #{am.id}"
      result = LetterSender::send_letter a_course_application: ca, a_message_template: am.message_template, user_name: user_name
    elsif msg.message_template_id
      mt = MessageTemplate.find(msg.message_template_id)
      result = LetterSender::send_letter a_course_application: ca, a_message_template: mt, user_name: user_name
    end
    if result
      metadata.ack #'ack'-ing will remove the message from the queue - do this even if we created a faultyCourseApp
    else
      logger.error "Could not ack for #{msg}"
    end
  rescue StandardError => e
    logger.error "#{e.message} #{e.backtrace}"
    # do not 'ack' - must be a programming mistake so leave message on queue - keep connection open to cont processing other messages
    # fix bug and restart the rake task to redeliver the unacknowledged messages
  end
  logger.info "*** SendLetterConsumer#handle_message   end #{Time.now}"
end    

prefetch 确实是答案,但 doc I linked to above regarding this 说要使用以下配置:

channel = AMQP::Channel.new(connection, :prefetch => 1)

但这根本不起作用。

我不得不这样做

channel    = AMQP::Channel.new(connection)
channel.prefetch(1)

现在它可以工作了,只发送一条消息并在发送下一条消息之前等待它被确认。

此解决方案在 rabbitmq 教程中描述 here,而不是 amqp gem。

那么,如果我只有一个具有预取功能的消费者,并且它无法确认消息,会发生什么情况。消息会开始堆积吗?

所以有 2 个消费者可能很好,但是这两个消费者可能都无法确认。

为了解决这个问题,我正在尝试拒绝并重新排队。因此,在我的消费者中,如果我没有点击确认消息的代码部分,我将使用 metadata.reject(:requeue=>true) 并将消息放回队列的前面。是的,没错,队列中的 "front" - 真可惜。这意味着消息仍然会堆积起来,因为相同的失败消息会不断地发送给一个消费者。

如上文前link所说"When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again."

为什么requeue不把它放在队列的末尾?那不是更好吗?您仍然会收到循环消息,但至少新消息会得到处理而不是堆积起来。

所以我尝试将预取设置为不止一个...两个。但同样的问题。一旦 2 条消息被拒绝并重新排队,我可怜的老消费者就会不断收到相同的消息,而不是收到它没有拒绝的消息,以便有机会处理积压的消息。

多个消费者怎么样?同样的问题。如果出现问题,我有 2 个消费者预取 x 消息和 metadata.reject(requeue:true) 它们。现在,如果前面的 2x 消息导致我的消费者出错,那么我会遇到同样的无限循环消息和备份消息的问题。如果在队列前端有少于 2x 的消息始终无法被确认,那么消费者将逐渐解决消息积压问题。

好像没有满意的解决办法

理想情况下,我希望我的预取消费者(由于最初的问题需要预取)能够不确认他们未能正确使用的消息,但也可以移动到队列中的下一条消息。换句话说,将坏消息留在未确认的消息集合中,而不是将它们放回队列中。问题是,对于预取,我必须拒绝它们,否则一切都会停止,我必须重新排队,否则我会丢失它们。

一种方法可能是:在我的消费者中,当重新传递的消息未能在代码中正确使用时,我将拒绝它但不会使用 metadata.reject() 重新排队,并以某种方式将此消息报告给开发人员,或将其保存在数据库中的失败消息 table 中,以便我们随后进行处理。 (重新传递标志 metadata.redelivered 参见 "At The Consumer" 部分中的 here

如果 rabbitmq 提供一个重新交付计数,那就太好了——这样我就可以将不重新排队的截止日期设置得更高,但它似乎并没有这样做,它只提供了一个重新交付的标志。

我的另一个回答说预取可以解决问题,但引入了一个新问题,即预取必须拒绝并重新排队失败的消息,这会导致循环,因为 reject(requeue:true) 将其放在队列的前面,仅供再次使用。多个消费者会有所帮助,但您仍然会陷入循环。

因此,为了使用预取但将失败的消息放在队列的后面,我发现使用死信交换设置是可行的。参见 this article about it, though it is for C#, but you can see the general idea. Also see RabbitMQ doc about Dead Letter Exchanges

一开始我没有理解它,所以这是我对在这种情况下使用死信交换的简短解释:

RabbitMq 不做延迟消息,所以想法是使用一个重试队列,并将消费者中失败的消息发布到这个重试队列上。反过来,这个重试队列会在一定时间后杀死它们,导致它们被放在主队列的末尾。

  1. 消费者尝试消费消息。

  2. 出了点问题,或者你发现了一个错误,所以你不确认 (metadata.ack) 消息而是 metadata.reject(requeue:false) 并发布到重试队列。

使用此重试队列的死信交换配置会发生以下情况:

  1. 消息在重试队列中停留 x 时间(在参数 "x-message-ttl" 中创建重试队列时设置,见下文)然后 RabbitMq 将其杀死。

  2. 由于使用参数 "x-dead-letter-exchange" 和 "x-dead-letter-routing-key" 在重试队列上配置了死信交换设置(见下文),此消息会自动返回到主线程的背面队列。

这样做的一个好处是重试队列甚至不需要任何消费者。

这是我放入消费者中用于发布到重试队列的一些代码

def publish_to_retry_queue(msg:, metadata:)
  @channel.queue("send_letter.retry", exclusive: false, persistent: true, durable: true,
                 arguments:{"x-dead-letter-exchange" => "dead_letter_exchange",
                            "x-dead-letter-routing-key" => "send_letter",
                            "x-message-ttl" => 25000})
  metadata.reject(requeue: false)
  res = @channel.default_exchange.publish(msg, routing_key: "send_letter.retry", headers: metadata.headers)
  @logger.info "result from publishing to retry queue is"
  @logger.info  res
  res
end

其中@channel 是主队列中的消费者正在使用的通道。 请注意,这要求您已经在 rabbitmq 上设置了名为 dead_letter_exchange 的交换,并从它添加了一个绑定到主队列,在本例中它是 send_letter 队列。