RabbitMQ批量消费消息解决方案

RabbitMQ bulk consuming messages solution

我将 RabbitMQ 用作不同消息的队列。当我使用来自 一个队列 两个不同的 消费者使用此消息时,我处理它们并将处理结果插入数据库:

def consumer_callback(self, channel, delivery_tag, properties, message):
    result = make_some_processing(message)
    insert_to_db(result)
    channel.basic_ack(delivery_tag)

我想批量使用队列中的消息,这将减少数据库负载。由于 RabbitMQ 不支持消费者批量读取消息,我将这样做:

some_messages_list = []
def consumer_callback(self, channel, delivery_tag, properties, message):
    some_messages_list.append({delivery_tag: message})
    if len(some_messages_list) > 1000:
        results_list = make_some_processing_bulk(some_messages_list)
        insert_to_db_bulk(results_list)
        
        for tag in some_messages_list:
            channel.basic_ack(tag)
        some_messages_list.clear()
  1. 消息在完全处理之前就在队列中
  2. 如果消费者跌倒或断开连接 - 消息保持安全

您如何看待该解决方案? 如果没问题,如果消费者跌倒,我如何重新获取所有未确认的消息?

这个解决方案我已经测试了几个月,可以说它非常好。直到 AMPQ 不提供批量消费的功能,我们必须使用这样的一些变通方法。

注意:如果您决定使用此解决方案,请注意与多个消费者(线程)并发消费,或使用一些 (我使用了 python threading.Lock 模块)来保证不会发生竞争条件。