Rabbitmq预取理解

Rabbit mq prefetch undestanding

下面我明白了

prefetch 只是控制 broker 一次允许有多少消息在消费者处突出。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。

但有关以下情况的问题:

  1. 假设预取为 200,我们有 2 个消费者闲置。经纪人收到 150 条消息,我认为经纪人会随机选择一条并发送所有 150 条消息?我认为是的,它不会在消费者之间共享。

  2. 假设一个消费者在 unack 中有 100 条消息,一个处于空闲状态,预取又是 200 条。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任何一个吗?或者它不会给已经有 100 条消息尚未确认的消费者

  3. 如果 prefetch 是 200,一个消费者得到 200,listener 会阻止那个线程(spring rabbitmq listner 方法)发送 ack 直到处理完所有 200 吗?我认为它不会一个一个地发送ack,而是会等到所有预取的消息都处理完。换句话说,如果 prefetch 是 200,如果 broker 发送了 200 条消息,那么 broker 什么时候开始收到 ack?

如果有两个活跃的消费者,代理将公平地分发新消息(直到每个实例有 200 个未完成的消息)。

如果队列中有 150 条消息且没有消费者运行;第一个开始的消费者将(可能)得到所有 150 个,但当两者都是 运行 时,分配是公平的。

如果每个消费者有 200 条未完成的消息,则代理将在每个消息被确认时按需发送新消息。消费者线程并没有“阻塞”,只是代理不再发送消息。

默认情况下,spring 将一次确认每条消息。可以通过设置容器的 batchSize 属性 来更改此行为。例如如果设置为100,则每100条记录发送一次ack;这提高了性能,但增加了失败后重复交付的风险。在这种情况下,代理将在 ack 后发送最多 100 条新消息。

在旧版本中,batchSize 被称为 txSize

编辑

查看此示例;在最新版本中默认预取为 250。

@SpringBootApplication
public class So65201334Application {

    public static void main(String[] args) {
        SpringApplication.run(So65201334Application.class, args);
    }

    @RabbitListener(id = "foo", queues = "foo", autoStartup = "false")
    @RabbitListener(id = "bar", queues = "foo", autoStartup = "false")
    void listen(String in, @Header(AmqpHeaders.CONSUMER_TAG) String tag) throws InterruptedException {
        System.out.println(tag);
        Thread.sleep(240_000);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, RabbitListenerEndpointRegistry registry) {
        return args -> {
            for (int i = 0; i < 200; i++) {
                template.convertAndSend("foo", "bar");
            }
            registry.getListenerContainer("foo").start();
            System.out.println("Hit Enter to start the second listener and send more records");
            System.in.read();
            registry.getListenerContainer("bar").start();
            Thread.sleep(2000);
            for (int i = 0; i < 200; i++) {
                template.convertAndSend("foo", "bar");
            }
        };
    }

}

不出所料,所有 200 个都给了第一个消费者:

当第二个消费者启动时,记录被发送给两个消费者,而不是没有积压的消费者。分布现在看起来像这样:

当我将预取增加到 400 时,您可以看到新消息有 50% 发送给了每个消费者。