rabbitmq如何同步消费者

How to synchronize consumers with rabbitmq

我遇到以下问题: 我有一个 RabbitMQ 集群、一个消息生产者和一个消费者集群(用于高可用性)。 每当消费者收到消息时,它都会根据消息内容和 运行 生成另一个进程。这个过程很长,大约需要30分钟。

我必须确保一次处理一条消息。但是,消费者比一个多,所以如果队列中有 2 条消息,一个消费者获取一条消息,第二个消费者获取另一条消息,并且它们是并行处理的。

供参考:每个消费者驻留在不同的机器上。

在 RabbitMQ 级别是否有任何机制允许我等待消费下一条消息直到上一条消息被确认?或者我是否必须在服务器之间开发一些锁定机制?

我只是引用了 rabbitmq 文档中的几点,我认为它们能够满足您的要求。

  1. 第一个参考 - https://www.rabbitmq.com/consumer-priority.html

Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.

我假设您所有的消费者都具有相同的优先级,因此消息将平均分配给所有活跃的消费者。

  1. 第二个参考 - https://www.rabbitmq.com/consumer-prefetch.html

the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming

正如您所提到的,您将在一台机器上拥有单个消费者,这更容易。

只需为每个消费者设置 1 个消费者预取限制。因此,在需要确认之前,服务器只会向消费者发送一条消息。并在您的消息完全处理后发送基本确认。

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(1); // Per consumer limit

Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("received message");
                // process the message .. time consuming

                // after processing send the basic ack so that next message can be received from queue
                channel.basicAck(envelope.getDeliveryTag(), false);
        };

 channel.basicConsume("my-queue", false, consumer);

希望对您有所帮助。

更新-

只是添加更多描述 - 当您使用

channel.basicQos(x);

Rabbitmq 将推送(如果在队列中可用,当然是在遵守优先级和循环法等之后)最大数量的未确认消息到通道上的每个消费者。这意味着通道上的每个消费者不会有超过 x 条未确认的消息,也就是说消费者可以在任何给定时刻同时处理最多 x 条消息。一旦消费者发回 ack,就可以将下一条消息推送给它。如果消费者觉得无法处理消息,也可以发送 nack。在这种情况下,消息将被重新排队,并且重新排队的消息可能会根据优先级、循环法等进入队列中的任何消费者。

每个通道可以有多个消费者。所以,当你使用

channel.basicQos(x, true);

限制 x 适用于整个频道,而不是频道上的 single/each 消费者。

在您的情况下,每个渠道上只有一个消费者。所以通道限制实际上对你的情况没有任何影响。

更多更新 -

一台机器通过连接连接到RabbitMQ。一个连接可以有多个通道。一个频道可以有多个消费者。所以从逻辑上讲,可以有不同的机器连接到 RabbitMQ,并且有多个通道和消费者在同一个队列上监听。您可以同时为通道(使用 channel.basicQos(x, true))和通道内的消费者(使用 channel.basicQos(x, false))设置 QOS 限制。限制 0 表示无限制。显然这些限制适用于通道实例。驻留在不同通道实例(在同一台机器或不同机器上)的所有消费者都将有自己的限制(默认或通过 QOS 方法明确设置)。

正如@vsoni 所解释的一般,我们可以使用基本的 QOS,这将允许每个消费者一次使用 X 条消息,并且更精确地 每个通道 。由于通道是 "lightweight connections that share a single TCP connection"(参见 here) then, according to my knowledge, it is impossible to make a QOS between two consumers running on two different machines. My goal was to not send send a message to second consumer while the first one is still consuming the previous one. I only managed to achieve it by external locking (for development purposes I made a simple file lock and 3 consumers on one machine) but for production I will probably use a DynamoDB locking。其他选项是使用 Zookeeper、etcd 或类似软件的分布式锁定。