Spring AMQP – 是@RabbitListener 轮询吗?

Spring AMQP – Is a @RabbitListener Polling under the Hood?

总结

我想异步处理来自 AMQP/RabbitMQ 队列的消息。我为此实现了一个 @RabbitListener 方法(来自 spring-rabbit),但似乎这个侦听器实际上正在轮询我的队列。这是可以预料的吗?我本来希望 RabbitMQ 以某种方式通知监听器,而不必轮询。

如果这是预料之中的,我是否也可以通过 Spring AMQP 不轮询地异步使用消息?

我观察到的

当我发送一条消息时,它被收听者正确接收。我仍然看到连续的日志消息流,表明侦听器继续轮询空队列:

…
15:41:10.543 [pool-1-thread-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - ConsumeOK : Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.544 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [], routingKey = [myQueue]
Sent: Hello World
15:41:10.559 [pool-1-thread-4] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.560 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'Hello World'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myQueue, deliveryTag=1, messageCount=0])
15:41:10.571 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=Hello World, headers={timestamp=1435844470571, id=018f39f6-ebca-aabf-7fe3-a095e959f65d, amqp_receivedRoutingKey=myQueue, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=myQueue, amqp_consumerTag=amq.ctag-bUsK4KQN6_QHzf8DoDC_ww, amqp_contentEncoding=UTF-8, contentType=text/plain, amqp_deliveryTag=1, amqp_redelivered=false}]]
Received: Hello World
15:41:10.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:11.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:12.583 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
…

最后一条日志消息基本上每秒无限重复。

我的测试代码

前两种方法可能是最有趣的部分;剩下的主要是Spring配置:

@Configuration
@EnableRabbit
public class MyTest {

    public static void main(String[] args) throws InterruptedException {
        try (ConfigurableApplicationContext appCtxt =
                new AnnotationConfigApplicationContext(MyTest.class)) {
            // send a test message
            RabbitTemplate template = appCtxt.getBean(RabbitTemplate.class);
            Queue queue = appCtxt.getBean(Queue.class);
            template.convertAndSend(queue.getName(), "Hello World");
            System.out.println("Sent: Hello World");

            // Now that the application with its message listeners is running,
            // block this thread forever; make sure, though, that the
            // application context can sanely be closed.
            appCtxt.registerShutdownHook();
            Object blockingObj = new Object();
            synchronized (blockingObj) {
                blockingObj.wait();
            }
        }
    }

    @RabbitListener(queues = "#{ @myQueue }")
    private void processHello(@Payload String msg,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
            throws IOException {
        System.out.println("Received: " + msg);
        channel.basicAck(deliveryTag, false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnFactory());
    }

    @Bean
    public ConnectionFactory rabbitConnFactory() {
        return new CachingConnectionFactory();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory
            rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory result =
                new SimpleRabbitListenerContainerFactory();
        result.setConnectionFactory(rabbitConnFactory());
        result.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return result;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", false);
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(rabbitConnFactory());
    }
}

不是轮询rabbitmq;当消息从 rabbit 异步到达时,它被放置在消费者的内部队列中;将消息交给阻塞的监听线程,等待消息的到来。

您看到的 DEBUG 消息是在侦听器线程超时等待来自 rabbitmq 的新消息到达之后。

您可以增加 receiveTimeout 以减少日志,或者只是禁用 BlockingQueueConsumer 的 DEBUG 日志记录。

增加超时将使容器对容器 stop() 请求的响应变慢。

编辑:

回应您在下方的评论...

是的,我们可以中断线程,但它比这更复杂。当 txSize > 1.

时,接收超时也用于确认消息

假设您只想每 20 条消息(而不是每条消息)确认一次。人们这样做是为了提高高容量环境中的性能。 timeout也是用来ack的(txSize实际上是每n条消息或超时)。

现在,假设有 19 条消息到达,然后 none 持续 60 秒,您的超时时间为 30 秒。

这意味着这19条消息很长一段时间都没有被确认。默认配置下,第19条消息到达后1秒发送ack。

这个超时确实没有什么开销(我们简单地循环并再次等待),所以增加它是不寻常的。

此外,当容器在上下文关闭时停止时,人们一直在停止和启动容器。