Spring AMQP AcknowledgeMode.AUTO 运行缓慢
Spring AMQP AcknowledgeMode.AUTO works slow
我有一个生产者,它每秒向 RabbitMQ 发送 20 条消息,还有一个消费者,它应该以与生产消息相同的速度接收消息。
我必须满足一些条件:
- 每秒生成和使用 20 条消息。
- 保存生产订单。
- 消息不应该丢失(这就是我使用 AcknowledgeMode.AUTO 的原因)。
当我使用 Spring AMQP 实现 (org.springframework.amqp.rabbit) 时,我的消费者每秒最多处理 6 条消息。但是,如果我使用本机 AMQP 库 (com.rabbitmq.client),它会每秒处理所有 20 条消息,包括自动和手动确认。
问题是:
为什么 Spring 在消费者案例中的实施工作如此缓慢,我该如何解决这个问题?
如果我设置 prefetchCount(20) 它会根据需要工作,但我不能使用预取,因为它会在拒绝情况下破坏订单。
Spring amqp:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqServer);
connectionFactory.setUsername(rabbitMqUsername);
connectionFactory.setPassword(rabbitMqPassword);
return connectionFactory;
}
...
private SimpleMessageListenerContainer createContainer(Queue queue, Receiver receiver, AcknowledgeMode acknowledgeMode) {
SimpleMessageListenerContainer persistentListenerContainer = new SimpleMessageListenerContainer();
persistentListenerContainer.setConnectionFactory(connectionFactory());
persistentListenerContainer.setQueues(queue);
persistentListenerContainer.setMessageListener(receiver);
persistentListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return persistentListenerContainer;
}
...
@Override
public void onMessage(Message message) {saveToDb}
Spring AMQP(2.0 之前)默认预取为 1,正如您所说,即使在拒绝后也能保证顺序。
本机客户端默认不应用 basicQos()
,这实际上意味着它具有无限预取。
所以你不是在比较苹果和苹果。
尝试 channel.basicQos(1)
使用本机客户端,您应该会看到与默认 spring amqp 设置类似的结果。
编辑
当比较苹果与苹果时,我得到相似的结果 with/without 框架...
@SpringBootApplication
public class So47995535Application {
public static void main(String[] args) {
SpringApplication.run(So47995535Application.class, args).close();
}
private final CountDownLatch latch = new CountDownLatch(100);
private int nativeCount;
private int rlCount;
@Bean
public ApplicationRunner runner(ConnectionFactory factory, RabbitTemplate template,
SimpleMessageListenerContainer container) {
return args -> {
for (int i = 0; i < 100; i++) {
template.convertAndSend("foo", "foo" + i);
}
container.start();
Connection conn = factory.createConnection();
Channel channel = conn.createChannel(false);
channel.basicQos(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println("native " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
nativeCount++;
latch.countDown();
}
});
latch.await(60, TimeUnit.SECONDS);
System.out.println("Native: " + this.nativeCount + " LC: " + this.rlCount);
channel.close();
conn.close();
container.stop();
};
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("foo");
container.setPrefetchCount(1);
container.setAutoStartup(false);
container.setMessageListener((MessageListener) m -> {
System.out.println("LC " + new String(m.getBody()));
this.rlCount++;
this.latch.countDown();
});
return container;
}
}
和
Native: 50 LC: 50
我有一个生产者,它每秒向 RabbitMQ 发送 20 条消息,还有一个消费者,它应该以与生产消息相同的速度接收消息。
我必须满足一些条件:
- 每秒生成和使用 20 条消息。
- 保存生产订单。
- 消息不应该丢失(这就是我使用 AcknowledgeMode.AUTO 的原因)。
当我使用 Spring AMQP 实现 (org.springframework.amqp.rabbit) 时,我的消费者每秒最多处理 6 条消息。但是,如果我使用本机 AMQP 库 (com.rabbitmq.client),它会每秒处理所有 20 条消息,包括自动和手动确认。
问题是:
为什么 Spring 在消费者案例中的实施工作如此缓慢,我该如何解决这个问题?
如果我设置 prefetchCount(20) 它会根据需要工作,但我不能使用预取,因为它会在拒绝情况下破坏订单。
Spring amqp:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqServer);
connectionFactory.setUsername(rabbitMqUsername);
connectionFactory.setPassword(rabbitMqPassword);
return connectionFactory;
}
...
private SimpleMessageListenerContainer createContainer(Queue queue, Receiver receiver, AcknowledgeMode acknowledgeMode) {
SimpleMessageListenerContainer persistentListenerContainer = new SimpleMessageListenerContainer();
persistentListenerContainer.setConnectionFactory(connectionFactory());
persistentListenerContainer.setQueues(queue);
persistentListenerContainer.setMessageListener(receiver);
persistentListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return persistentListenerContainer;
}
...
@Override
public void onMessage(Message message) {saveToDb}
Spring AMQP(2.0 之前)默认预取为 1,正如您所说,即使在拒绝后也能保证顺序。
本机客户端默认不应用 basicQos()
,这实际上意味着它具有无限预取。
所以你不是在比较苹果和苹果。
尝试 channel.basicQos(1)
使用本机客户端,您应该会看到与默认 spring amqp 设置类似的结果。
编辑
当比较苹果与苹果时,我得到相似的结果 with/without 框架...
@SpringBootApplication
public class So47995535Application {
public static void main(String[] args) {
SpringApplication.run(So47995535Application.class, args).close();
}
private final CountDownLatch latch = new CountDownLatch(100);
private int nativeCount;
private int rlCount;
@Bean
public ApplicationRunner runner(ConnectionFactory factory, RabbitTemplate template,
SimpleMessageListenerContainer container) {
return args -> {
for (int i = 0; i < 100; i++) {
template.convertAndSend("foo", "foo" + i);
}
container.start();
Connection conn = factory.createConnection();
Channel channel = conn.createChannel(false);
channel.basicQos(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println("native " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
nativeCount++;
latch.countDown();
}
});
latch.await(60, TimeUnit.SECONDS);
System.out.println("Native: " + this.nativeCount + " LC: " + this.rlCount);
channel.close();
conn.close();
container.stop();
};
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("foo");
container.setPrefetchCount(1);
container.setAutoStartup(false);
container.setMessageListener((MessageListener) m -> {
System.out.println("LC " + new String(m.getBody()));
this.rlCount++;
this.latch.countDown();
});
return container;
}
}
和
Native: 50 LC: 50