如何让 RabbitMQ PrefetchCount 工作?
How can make the RabbitMQ PrefetchCount work?
这是我的 RabbitListenerFactory:
@Bean("workListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
containerFactory.setDefaultRequeueRejected(true);
containerFactory.setPrefetchCount(2);
}
这是我的消费者:
@RabbitListener(queues = "test-todo-delete", containerFactory = "workListenerFactory")
public void onAuditResult(Message message, Channel channel) throws IOException {
String payload = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("get the message:{} TimeStamp:{}", payload, new Date());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
在 RabbitMQ 客户端中,看起来配置没问题(我在控制台中发送了消息):
但我的实际日志:
2022-01-06 16:16:25.051 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.202 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.353 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.554 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.746 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.936 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:26.159 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.340 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.535 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.716 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.900 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:27.082 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:27 CST 2022
为什么不是每秒输出两次
预取只是意味着在任何时候客户端可以有多少未确认的消息待处理;一旦您确认一条消息,就会发送下一条消息。
这是我的 RabbitListenerFactory:
@Bean("workListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
containerFactory.setDefaultRequeueRejected(true);
containerFactory.setPrefetchCount(2);
}
这是我的消费者:
@RabbitListener(queues = "test-todo-delete", containerFactory = "workListenerFactory")
public void onAuditResult(Message message, Channel channel) throws IOException {
String payload = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("get the message:{} TimeStamp:{}", payload, new Date());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
在 RabbitMQ 客户端中,看起来配置没问题(我在控制台中发送了消息):
但我的实际日志:
2022-01-06 16:16:25.051 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.202 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.353 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.554 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.746 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:25.936 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:25 CST 2022
2022-01-06 16:16:26.159 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.340 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.535 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.716 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:26.900 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:26 CST 2022
2022-01-06 16:16:27.082 [ INFO] [cTaskExecutor-1] [TID: N/A] [c.p.o.usersignon.server.mq.TestConsumer ] : get the message:{"A":"1"} TimeStamp:Thu Jan 06 16:16:27 CST 2022
为什么不是每秒输出两次
预取只是意味着在任何时候客户端可以有多少未确认的消息待处理;一旦您确认一条消息,就会发送下一条消息。