RabbitMQ批量消费消息解决方案
RabbitMQ bulk consuming messages solution
我将 RabbitMQ 用作不同消息的队列。当我使用来自 一个队列 的 两个不同的 消费者使用此消息时,我处理它们并将处理结果插入数据库:
def consumer_callback(self, channel, delivery_tag, properties, message):
result = make_some_processing(message)
insert_to_db(result)
channel.basic_ack(delivery_tag)
我想批量使用队列中的消息,这将减少数据库负载。由于 RabbitMQ 不支持消费者批量读取消息,我将这样做:
some_messages_list = []
def consumer_callback(self, channel, delivery_tag, properties, message):
some_messages_list.append({delivery_tag: message})
if len(some_messages_list) > 1000:
results_list = make_some_processing_bulk(some_messages_list)
insert_to_db_bulk(results_list)
for tag in some_messages_list:
channel.basic_ack(tag)
some_messages_list.clear()
- 消息在完全处理之前就在队列中
- 如果消费者跌倒或断开连接 - 消息保持安全
您如何看待该解决方案?
如果没问题,如果消费者跌倒,我如何重新获取所有未确认的消息?
这个解决方案我已经测试了几个月,可以说它非常好。直到 AMPQ 不提供批量消费的功能,我们必须使用这样的一些变通方法。
注意:如果您决定使用此解决方案,请注意与多个消费者(线程)并发消费,或使用一些锁 (我使用了 python threading.Lock 模块)来保证不会发生竞争条件。
我将 RabbitMQ 用作不同消息的队列。当我使用来自 一个队列 的 两个不同的 消费者使用此消息时,我处理它们并将处理结果插入数据库:
def consumer_callback(self, channel, delivery_tag, properties, message):
result = make_some_processing(message)
insert_to_db(result)
channel.basic_ack(delivery_tag)
我想批量使用队列中的消息,这将减少数据库负载。由于 RabbitMQ 不支持消费者批量读取消息,我将这样做:
some_messages_list = []
def consumer_callback(self, channel, delivery_tag, properties, message):
some_messages_list.append({delivery_tag: message})
if len(some_messages_list) > 1000:
results_list = make_some_processing_bulk(some_messages_list)
insert_to_db_bulk(results_list)
for tag in some_messages_list:
channel.basic_ack(tag)
some_messages_list.clear()
- 消息在完全处理之前就在队列中
- 如果消费者跌倒或断开连接 - 消息保持安全
您如何看待该解决方案? 如果没问题,如果消费者跌倒,我如何重新获取所有未确认的消息?
这个解决方案我已经测试了几个月,可以说它非常好。直到 AMPQ 不提供批量消费的功能,我们必须使用这样的一些变通方法。
注意:如果您决定使用此解决方案,请注意与多个消费者(线程)并发消费,或使用一些锁 (我使用了 python threading.Lock 模块)来保证不会发生竞争条件。