rails 的 AMQP gem 正在重新排队数百条已成功处理的消息
AMQP gem for rails is requeuing hundreds of successfully processed messages
为什么当我的队列中有很多消息 (1200) 时,即使我的代码成功处理了消息并且 "acks" 我的消息仍被重新排队?
和
我该如何解决这个问题?
..
我有一个应用程序使用 rails amqp gem 来利用 RabbitMQ。我们将消息放在队列中,其中包含有关需要发送的电子邮件的信息,订阅者将其取下并发送。
有时数百条消息会快速连续地放入队列。
我们使用 acknowledgements 来确保消息不会丢失。
它工作得很好,直到最近我发现队列中有 1200 条消息并且没有被消费。
那么为什么我的消费者没有消费它们?
查看日志我发现是的,它已经消耗了它们并且发送了电子邮件。我重新启动了消费者,它重新消费了它们,这意味着我们向用户发送了多封相同的电子邮件。哎呀!但是我通过观察 RabbitMQ UI 注意到的是,当我重新启动消费者时,它立即从队列中取出了所有 1200 条消息。几分钟后,这些消息被重新排队,即使我的消费者仍在浏览它们并发送电子邮件。在我们的代码中,消费者会在每封电子邮件发送后确认消息(消息已处理)。
所以我对正在发生的事情的最佳猜测是,当队列中有很多消息时,消费者将它们全部取出,但不会单独确认每一个,而是等到所有消息都被处理完做一个质量确认。由于这需要很长时间,10 分钟,RabbitMQ 端发生了一些事情,它说,嘿,这花费的时间太长了,让我们重新排队所有这些消息,即使我的消费者仍在成功处理它们。
我环顾四周,发现了一个叫做心跳的东西,但我找不到任何关于它是什么以及如何使用它的明确解释,如果我需要使用它的话。但听起来它可能与队列和消费者之间的通信有关,并且可能是在处理这些消息时不让所有这些消息重新排队的关键。
我尝试的另一件事是使用预取:1. 描述 here 。虽然这似乎不合适,因为我只有一个消费者。但这听起来很有希望,因为它看起来好像可以强制确认gem所有消息。
考虑到我们可以快速连续地将数百条消息放入队列,我是否应该考虑多个消费者?
这是我订阅队列的 rake 任务
task :subscribe_basic => :environment do |task_name|
begin # make sure any exception is logged
log = Rails.logger
routing_key = "send_letter"
tcp_connection_settings =
{:host=>"localhost",
:port=>5672,
:vhost=>"dev_vhost",
:user=>"dev_user",
:pass=>"abc123",
:timeout=>0.3,
:ssl=>false,
:on_tcp_connection_loss=>
handle_conn_loss,
:logging=>true}
begin
::AMQP.start(tcp_connection_settings) do |connection|
channel = ::AMQP::Channel.new(connection, :prefetch => 1)
binding.pry
channel.auto_recovery = true
cons = SendLetterConsumer.new channel, log
queue = channel.queue(routing_key, exclusive: false, durable: true)
consumer1 = AMQP::Consumer.new(channel, queue, nil, exclusive = false, no_ack = false)
consumer1.consume.on_delivery(&cons.method(:handle_message))
log.info "subscribed to queue #{routing_key}, config_key #{config_key} (#{Process.pid})"
Signal.trap 'INT' do # kill -s INT <pid> , kill -2 <pid>, Ctrl+C
log.info "#{task_name} stopping(#{Process.pid})..."
channel.close { EventMachine.stop } # otherwise segfault
end
end
rescue StandardError => ex
# 2015-03-20 02:52:49 UTC MQ raised EventMachine::ConnectionError: unable to resolve server address
log.error "MQ raised #{ex.class.name}: #{ex.message} Backtrace: #{ex.backtrace}"
end
rescue Exception => ex
log.error "#{ex.class.name}: #{ex.message} -- #{ex.backtrace.inspect}"
raise ex
end
end
这是我们用来处理消息的消费者代码(在上面的代码中调用:consumer1.consume.on_delivery(&cons.method(:handle_message))
):
def handle_message(metadata, payload)
logger.info "*** SendLetterConsumer#handle_message start #{Time.now}"
logger.info payload
begin
# {course_app: aCourseApplication, errors:[]}
# {course_app: aFaultyCourseApplication, errors: ['error1', 'error2']}
msg = JSON.parse(payload)
ca = CourseApplication.find(msg['course_application_id'])
am = AutomatedMessage.find(msg['automated_message_id'])
user_name = msg['user_name']
if am.present?
raise "Cannot send a letter for Automated message with id #{am.id} because it does not have an associated message template" if am.message_template.nil?
logger.info "attempt to send letter for Automated Message with id #{am.id}"
result = LetterSender::send_letter a_course_application: ca, a_message_template: am.message_template, user_name: user_name
elsif msg.message_template_id
mt = MessageTemplate.find(msg.message_template_id)
result = LetterSender::send_letter a_course_application: ca, a_message_template: mt, user_name: user_name
end
if result
metadata.ack #'ack'-ing will remove the message from the queue - do this even if we created a faultyCourseApp
else
logger.error "Could not ack for #{msg}"
end
rescue StandardError => e
logger.error "#{e.message} #{e.backtrace}"
# do not 'ack' - must be a programming mistake so leave message on queue - keep connection open to cont processing other messages
# fix bug and restart the rake task to redeliver the unacknowledged messages
end
logger.info "*** SendLetterConsumer#handle_message end #{Time.now}"
end
prefetch 确实是答案,但 doc I linked to above regarding this 说要使用以下配置:
channel = AMQP::Channel.new(connection, :prefetch => 1)
但这根本不起作用。
我不得不这样做
channel = AMQP::Channel.new(connection)
channel.prefetch(1)
现在它可以工作了,只发送一条消息并在发送下一条消息之前等待它被确认。
此解决方案在 rabbitmq 教程中描述 here,而不是 amqp gem。
那么,如果我只有一个具有预取功能的消费者,并且它无法确认消息,会发生什么情况。消息会开始堆积吗?
是
所以有 2 个消费者可能很好,但是这两个消费者可能都无法确认。
为了解决这个问题,我正在尝试拒绝并重新排队。因此,在我的消费者中,如果我没有点击确认消息的代码部分,我将使用 metadata.reject(:requeue=>true)
并将消息放回队列的前面。是的,没错,队列中的 "front" - 真可惜。这意味着消息仍然会堆积起来,因为相同的失败消息会不断地发送给一个消费者。
如上文前link所说"When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again."
为什么requeue不把它放在队列的末尾?那不是更好吗?您仍然会收到循环消息,但至少新消息会得到处理而不是堆积起来。
所以我尝试将预取设置为不止一个...两个。但同样的问题。一旦 2 条消息被拒绝并重新排队,我可怜的老消费者就会不断收到相同的消息,而不是收到它没有拒绝的消息,以便有机会处理积压的消息。
多个消费者怎么样?同样的问题。如果出现问题,我有 2 个消费者预取 x 消息和 metadata.reject(requeue:true)
它们。现在,如果前面的 2x 消息导致我的消费者出错,那么我会遇到同样的无限循环消息和备份消息的问题。如果在队列前端有少于 2x 的消息始终无法被确认,那么消费者将逐渐解决消息积压问题。
好像没有满意的解决办法
理想情况下,我希望我的预取消费者(由于最初的问题需要预取)能够不确认他们未能正确使用的消息,但也可以移动到队列中的下一条消息。换句话说,将坏消息留在未确认的消息集合中,而不是将它们放回队列中。问题是,对于预取,我必须拒绝它们,否则一切都会停止,我必须重新排队,否则我会丢失它们。
一种方法可能是:在我的消费者中,当重新传递的消息未能在代码中正确使用时,我将拒绝它但不会使用 metadata.reject()
重新排队,并以某种方式将此消息报告给开发人员,或将其保存在数据库中的失败消息 table 中,以便我们随后进行处理。 (重新传递标志 metadata.redelivered
参见 "At The Consumer" 部分中的 here)
如果 rabbitmq 提供一个重新交付计数,那就太好了——这样我就可以将不重新排队的截止日期设置得更高,但它似乎并没有这样做,它只提供了一个重新交付的标志。
我的另一个回答说预取可以解决问题,但引入了一个新问题,即预取必须拒绝并重新排队失败的消息,这会导致循环,因为 reject(requeue:true)
将其放在队列的前面,仅供再次使用。多个消费者会有所帮助,但您仍然会陷入循环。
因此,为了使用预取但将失败的消息放在队列的后面,我发现使用死信交换设置是可行的。参见 this article about it, though it is for C#, but you can see the general idea. Also see RabbitMQ doc about Dead Letter Exchanges。
一开始我没有理解它,所以这是我对在这种情况下使用死信交换的简短解释:
RabbitMq 不做延迟消息,所以想法是使用一个重试队列,并将消费者中失败的消息发布到这个重试队列上。反过来,这个重试队列会在一定时间后杀死它们,导致它们被放在主队列的末尾。
消费者尝试消费消息。
出了点问题,或者你发现了一个错误,所以你不确认 (metadata.ack
) 消息而是 metadata.reject(requeue:false)
并发布到重试队列。
使用此重试队列的死信交换配置会发生以下情况:
消息在重试队列中停留 x 时间(在参数 "x-message-ttl" 中创建重试队列时设置,见下文)然后 RabbitMq 将其杀死。
由于使用参数 "x-dead-letter-exchange" 和 "x-dead-letter-routing-key" 在重试队列上配置了死信交换设置(见下文),此消息会自动返回到主线程的背面队列。
这样做的一个好处是重试队列甚至不需要任何消费者。
这是我放入消费者中用于发布到重试队列的一些代码
def publish_to_retry_queue(msg:, metadata:)
@channel.queue("send_letter.retry", exclusive: false, persistent: true, durable: true,
arguments:{"x-dead-letter-exchange" => "dead_letter_exchange",
"x-dead-letter-routing-key" => "send_letter",
"x-message-ttl" => 25000})
metadata.reject(requeue: false)
res = @channel.default_exchange.publish(msg, routing_key: "send_letter.retry", headers: metadata.headers)
@logger.info "result from publishing to retry queue is"
@logger.info res
res
end
其中@channel 是主队列中的消费者正在使用的通道。
请注意,这要求您已经在 rabbitmq 上设置了名为 dead_letter_exchange 的交换,并从它添加了一个绑定到主队列,在本例中它是 send_letter 队列。
为什么当我的队列中有很多消息 (1200) 时,即使我的代码成功处理了消息并且 "acks" 我的消息仍被重新排队?
和
我该如何解决这个问题?
..
我有一个应用程序使用 rails amqp gem 来利用 RabbitMQ。我们将消息放在队列中,其中包含有关需要发送的电子邮件的信息,订阅者将其取下并发送。
有时数百条消息会快速连续地放入队列。
我们使用 acknowledgements 来确保消息不会丢失。
它工作得很好,直到最近我发现队列中有 1200 条消息并且没有被消费。
那么为什么我的消费者没有消费它们?
查看日志我发现是的,它已经消耗了它们并且发送了电子邮件。我重新启动了消费者,它重新消费了它们,这意味着我们向用户发送了多封相同的电子邮件。哎呀!但是我通过观察 RabbitMQ UI 注意到的是,当我重新启动消费者时,它立即从队列中取出了所有 1200 条消息。几分钟后,这些消息被重新排队,即使我的消费者仍在浏览它们并发送电子邮件。在我们的代码中,消费者会在每封电子邮件发送后确认消息(消息已处理)。
所以我对正在发生的事情的最佳猜测是,当队列中有很多消息时,消费者将它们全部取出,但不会单独确认每一个,而是等到所有消息都被处理完做一个质量确认。由于这需要很长时间,10 分钟,RabbitMQ 端发生了一些事情,它说,嘿,这花费的时间太长了,让我们重新排队所有这些消息,即使我的消费者仍在成功处理它们。
我环顾四周,发现了一个叫做心跳的东西,但我找不到任何关于它是什么以及如何使用它的明确解释,如果我需要使用它的话。但听起来它可能与队列和消费者之间的通信有关,并且可能是在处理这些消息时不让所有这些消息重新排队的关键。
我尝试的另一件事是使用预取:1. 描述 here 。虽然这似乎不合适,因为我只有一个消费者。但这听起来很有希望,因为它看起来好像可以强制确认gem所有消息。
考虑到我们可以快速连续地将数百条消息放入队列,我是否应该考虑多个消费者?
这是我订阅队列的 rake 任务
task :subscribe_basic => :environment do |task_name|
begin # make sure any exception is logged
log = Rails.logger
routing_key = "send_letter"
tcp_connection_settings =
{:host=>"localhost",
:port=>5672,
:vhost=>"dev_vhost",
:user=>"dev_user",
:pass=>"abc123",
:timeout=>0.3,
:ssl=>false,
:on_tcp_connection_loss=>
handle_conn_loss,
:logging=>true}
begin
::AMQP.start(tcp_connection_settings) do |connection|
channel = ::AMQP::Channel.new(connection, :prefetch => 1)
binding.pry
channel.auto_recovery = true
cons = SendLetterConsumer.new channel, log
queue = channel.queue(routing_key, exclusive: false, durable: true)
consumer1 = AMQP::Consumer.new(channel, queue, nil, exclusive = false, no_ack = false)
consumer1.consume.on_delivery(&cons.method(:handle_message))
log.info "subscribed to queue #{routing_key}, config_key #{config_key} (#{Process.pid})"
Signal.trap 'INT' do # kill -s INT <pid> , kill -2 <pid>, Ctrl+C
log.info "#{task_name} stopping(#{Process.pid})..."
channel.close { EventMachine.stop } # otherwise segfault
end
end
rescue StandardError => ex
# 2015-03-20 02:52:49 UTC MQ raised EventMachine::ConnectionError: unable to resolve server address
log.error "MQ raised #{ex.class.name}: #{ex.message} Backtrace: #{ex.backtrace}"
end
rescue Exception => ex
log.error "#{ex.class.name}: #{ex.message} -- #{ex.backtrace.inspect}"
raise ex
end
end
这是我们用来处理消息的消费者代码(在上面的代码中调用:consumer1.consume.on_delivery(&cons.method(:handle_message))
):
def handle_message(metadata, payload)
logger.info "*** SendLetterConsumer#handle_message start #{Time.now}"
logger.info payload
begin
# {course_app: aCourseApplication, errors:[]}
# {course_app: aFaultyCourseApplication, errors: ['error1', 'error2']}
msg = JSON.parse(payload)
ca = CourseApplication.find(msg['course_application_id'])
am = AutomatedMessage.find(msg['automated_message_id'])
user_name = msg['user_name']
if am.present?
raise "Cannot send a letter for Automated message with id #{am.id} because it does not have an associated message template" if am.message_template.nil?
logger.info "attempt to send letter for Automated Message with id #{am.id}"
result = LetterSender::send_letter a_course_application: ca, a_message_template: am.message_template, user_name: user_name
elsif msg.message_template_id
mt = MessageTemplate.find(msg.message_template_id)
result = LetterSender::send_letter a_course_application: ca, a_message_template: mt, user_name: user_name
end
if result
metadata.ack #'ack'-ing will remove the message from the queue - do this even if we created a faultyCourseApp
else
logger.error "Could not ack for #{msg}"
end
rescue StandardError => e
logger.error "#{e.message} #{e.backtrace}"
# do not 'ack' - must be a programming mistake so leave message on queue - keep connection open to cont processing other messages
# fix bug and restart the rake task to redeliver the unacknowledged messages
end
logger.info "*** SendLetterConsumer#handle_message end #{Time.now}"
end
prefetch 确实是答案,但 doc I linked to above regarding this 说要使用以下配置:
channel = AMQP::Channel.new(connection, :prefetch => 1)
但这根本不起作用。
我不得不这样做
channel = AMQP::Channel.new(connection)
channel.prefetch(1)
现在它可以工作了,只发送一条消息并在发送下一条消息之前等待它被确认。
此解决方案在 rabbitmq 教程中描述 here,而不是 amqp gem。
那么,如果我只有一个具有预取功能的消费者,并且它无法确认消息,会发生什么情况。消息会开始堆积吗?
是
所以有 2 个消费者可能很好,但是这两个消费者可能都无法确认。
为了解决这个问题,我正在尝试拒绝并重新排队。因此,在我的消费者中,如果我没有点击确认消息的代码部分,我将使用 metadata.reject(:requeue=>true)
并将消息放回队列的前面。是的,没错,队列中的 "front" - 真可惜。这意味着消息仍然会堆积起来,因为相同的失败消息会不断地发送给一个消费者。
如上文前link所说"When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again."
为什么requeue不把它放在队列的末尾?那不是更好吗?您仍然会收到循环消息,但至少新消息会得到处理而不是堆积起来。
所以我尝试将预取设置为不止一个...两个。但同样的问题。一旦 2 条消息被拒绝并重新排队,我可怜的老消费者就会不断收到相同的消息,而不是收到它没有拒绝的消息,以便有机会处理积压的消息。
多个消费者怎么样?同样的问题。如果出现问题,我有 2 个消费者预取 x 消息和 metadata.reject(requeue:true)
它们。现在,如果前面的 2x 消息导致我的消费者出错,那么我会遇到同样的无限循环消息和备份消息的问题。如果在队列前端有少于 2x 的消息始终无法被确认,那么消费者将逐渐解决消息积压问题。
好像没有满意的解决办法
理想情况下,我希望我的预取消费者(由于最初的问题需要预取)能够不确认他们未能正确使用的消息,但也可以移动到队列中的下一条消息。换句话说,将坏消息留在未确认的消息集合中,而不是将它们放回队列中。问题是,对于预取,我必须拒绝它们,否则一切都会停止,我必须重新排队,否则我会丢失它们。
一种方法可能是:在我的消费者中,当重新传递的消息未能在代码中正确使用时,我将拒绝它但不会使用 metadata.reject()
重新排队,并以某种方式将此消息报告给开发人员,或将其保存在数据库中的失败消息 table 中,以便我们随后进行处理。 (重新传递标志 metadata.redelivered
参见 "At The Consumer" 部分中的 here)
如果 rabbitmq 提供一个重新交付计数,那就太好了——这样我就可以将不重新排队的截止日期设置得更高,但它似乎并没有这样做,它只提供了一个重新交付的标志。
我的另一个回答说预取可以解决问题,但引入了一个新问题,即预取必须拒绝并重新排队失败的消息,这会导致循环,因为 reject(requeue:true)
将其放在队列的前面,仅供再次使用。多个消费者会有所帮助,但您仍然会陷入循环。
因此,为了使用预取但将失败的消息放在队列的后面,我发现使用死信交换设置是可行的。参见 this article about it, though it is for C#, but you can see the general idea. Also see RabbitMQ doc about Dead Letter Exchanges。
一开始我没有理解它,所以这是我对在这种情况下使用死信交换的简短解释:
RabbitMq 不做延迟消息,所以想法是使用一个重试队列,并将消费者中失败的消息发布到这个重试队列上。反过来,这个重试队列会在一定时间后杀死它们,导致它们被放在主队列的末尾。
消费者尝试消费消息。
出了点问题,或者你发现了一个错误,所以你不确认 (
metadata.ack
) 消息而是metadata.reject(requeue:false)
并发布到重试队列。
使用此重试队列的死信交换配置会发生以下情况:
消息在重试队列中停留 x 时间(在参数 "x-message-ttl" 中创建重试队列时设置,见下文)然后 RabbitMq 将其杀死。
由于使用参数 "x-dead-letter-exchange" 和 "x-dead-letter-routing-key" 在重试队列上配置了死信交换设置(见下文),此消息会自动返回到主线程的背面队列。
这样做的一个好处是重试队列甚至不需要任何消费者。
这是我放入消费者中用于发布到重试队列的一些代码
def publish_to_retry_queue(msg:, metadata:)
@channel.queue("send_letter.retry", exclusive: false, persistent: true, durable: true,
arguments:{"x-dead-letter-exchange" => "dead_letter_exchange",
"x-dead-letter-routing-key" => "send_letter",
"x-message-ttl" => 25000})
metadata.reject(requeue: false)
res = @channel.default_exchange.publish(msg, routing_key: "send_letter.retry", headers: metadata.headers)
@logger.info "result from publishing to retry queue is"
@logger.info res
res
end
其中@channel 是主队列中的消费者正在使用的通道。 请注意,这要求您已经在 rabbitmq 上设置了名为 dead_letter_exchange 的交换,并从它添加了一个绑定到主队列,在本例中它是 send_letter 队列。