Rabbitmq预取理解
Rabbit mq prefetch undestanding
下面我明白了
prefetch 只是控制 broker 一次允许有多少消息在消费者处突出。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。
但有关以下情况的问题:
假设预取为 200,我们有 2 个消费者闲置。经纪人收到 150 条消息,我认为经纪人会随机选择一条并发送所有 150 条消息?我认为是的,它不会在消费者之间共享。
假设一个消费者在 unack 中有 100 条消息,一个处于空闲状态,预取又是 200 条。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任何一个吗?或者它不会给已经有 100 条消息尚未确认的消费者
如果 prefetch 是 200,一个消费者得到 200,listener 会阻止那个线程(spring rabbitmq listner 方法)发送 ack 直到处理完所有 200 吗?我认为它不会一个一个地发送ack,而是会等到所有预取的消息都处理完。换句话说,如果 prefetch 是 200,如果 broker 发送了 200 条消息,那么 broker 什么时候开始收到 ack?
如果有两个活跃的消费者,代理将公平地分发新消息(直到每个实例有 200 个未完成的消息)。
如果队列中有 150 条消息且没有消费者运行;第一个开始的消费者将(可能)得到所有 150 个,但当两者都是 运行 时,分配是公平的。
如果每个消费者有 200 条未完成的消息,则代理将在每个消息被确认时按需发送新消息。消费者线程并没有“阻塞”,只是代理不再发送消息。
默认情况下,spring 将一次确认每条消息。可以通过设置容器的 batchSize
属性 来更改此行为。例如如果设置为100,则每100条记录发送一次ack;这提高了性能,但增加了失败后重复交付的风险。在这种情况下,代理将在 ack 后发送最多 100 条新消息。
在旧版本中,batchSize
被称为 txSize
。
编辑
查看此示例;在最新版本中默认预取为 250。
@SpringBootApplication
public class So65201334Application {
public static void main(String[] args) {
SpringApplication.run(So65201334Application.class, args);
}
@RabbitListener(id = "foo", queues = "foo", autoStartup = "false")
@RabbitListener(id = "bar", queues = "foo", autoStartup = "false")
void listen(String in, @Header(AmqpHeaders.CONSUMER_TAG) String tag) throws InterruptedException {
System.out.println(tag);
Thread.sleep(240_000);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, RabbitListenerEndpointRegistry registry) {
return args -> {
for (int i = 0; i < 200; i++) {
template.convertAndSend("foo", "bar");
}
registry.getListenerContainer("foo").start();
System.out.println("Hit Enter to start the second listener and send more records");
System.in.read();
registry.getListenerContainer("bar").start();
Thread.sleep(2000);
for (int i = 0; i < 200; i++) {
template.convertAndSend("foo", "bar");
}
};
}
}
不出所料,所有 200 个都给了第一个消费者:
当第二个消费者启动时,记录被发送给两个消费者,而不是没有积压的消费者。分布现在看起来像这样:
当我将预取增加到 400 时,您可以看到新消息有 50% 发送给了每个消费者。
下面我明白了
prefetch 只是控制 broker 一次允许有多少消息在消费者处突出。当设置为 1 时,这意味着代理将发送 1 条消息,等待 ack,然后发送下一条。
但有关以下情况的问题:
假设预取为 200,我们有 2 个消费者闲置。经纪人收到 150 条消息,我认为经纪人会随机选择一条并发送所有 150 条消息?我认为是的,它不会在消费者之间共享。
假设一个消费者在 unack 中有 100 条消息,一个处于空闲状态,预取又是 200 条。现在我们又收到了 50 条消息,我认为代理会随机将这 50 条消息分配给任何一个吗?或者它不会给已经有 100 条消息尚未确认的消费者
如果 prefetch 是 200,一个消费者得到 200,listener 会阻止那个线程(spring rabbitmq listner 方法)发送 ack 直到处理完所有 200 吗?我认为它不会一个一个地发送ack,而是会等到所有预取的消息都处理完。换句话说,如果 prefetch 是 200,如果 broker 发送了 200 条消息,那么 broker 什么时候开始收到 ack?
如果有两个活跃的消费者,代理将公平地分发新消息(直到每个实例有 200 个未完成的消息)。
如果队列中有 150 条消息且没有消费者运行;第一个开始的消费者将(可能)得到所有 150 个,但当两者都是 运行 时,分配是公平的。
如果每个消费者有 200 条未完成的消息,则代理将在每个消息被确认时按需发送新消息。消费者线程并没有“阻塞”,只是代理不再发送消息。
默认情况下,spring 将一次确认每条消息。可以通过设置容器的 batchSize
属性 来更改此行为。例如如果设置为100,则每100条记录发送一次ack;这提高了性能,但增加了失败后重复交付的风险。在这种情况下,代理将在 ack 后发送最多 100 条新消息。
在旧版本中,batchSize
被称为 txSize
。
编辑
查看此示例;在最新版本中默认预取为 250。
@SpringBootApplication
public class So65201334Application {
public static void main(String[] args) {
SpringApplication.run(So65201334Application.class, args);
}
@RabbitListener(id = "foo", queues = "foo", autoStartup = "false")
@RabbitListener(id = "bar", queues = "foo", autoStartup = "false")
void listen(String in, @Header(AmqpHeaders.CONSUMER_TAG) String tag) throws InterruptedException {
System.out.println(tag);
Thread.sleep(240_000);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, RabbitListenerEndpointRegistry registry) {
return args -> {
for (int i = 0; i < 200; i++) {
template.convertAndSend("foo", "bar");
}
registry.getListenerContainer("foo").start();
System.out.println("Hit Enter to start the second listener and send more records");
System.in.read();
registry.getListenerContainer("bar").start();
Thread.sleep(2000);
for (int i = 0; i < 200; i++) {
template.convertAndSend("foo", "bar");
}
};
}
}
不出所料,所有 200 个都给了第一个消费者:
当第二个消费者启动时,记录被发送给两个消费者,而不是没有积压的消费者。分布现在看起来像这样:
当我将预取增加到 400 时,您可以看到新消息有 50% 发送给了每个消费者。