队列最大长度或 TTL 使用获取但不使用

Queue max-length or TTL work with get but not consume

我看到很多帖子询问有关限制队列长度的问题。在我对 Pika 和 RabbitMQ 的实验中,如果我用 arguments={'x-message-ttl': 1000, 'x-max-length': 2, 'x-overflow': 'drop-head'} 声明队列,甚至在生成消息属性时将 expiration='1000' 添加到消息属性中,我可以看到这三个都分别有助于从队列中删除消息.我的目标是确保消费者只收到最新的信息。

但是,正如此处指出的:RabbitMQ messages delivered after x-message-ttl has expired,我只能使用 basic_get 而不是 basic_consume

来让它工作

basic_get 好像是拉消息,每次都发送一个请求。我需要能够等待服务器推送消息,而不是轮询它。消费者不是正确的方法吗?消费者使用 x-message-ttlx-max-length(我试过 basic_qos(prefetch_count=1))的要求是什么?

问题是在您链接的代码中 autoAck 设置为 True。如果您将 autoAck 设置为 True,当消息在您的缓冲区中时,它被视为已收到,并且 RabbitMQ 将向您发送新消息。

这基本上意味着您 Thread.sleep(300) 所做的只是延迟消息显示的时间。消息仍在接收并放入缓冲区,更糟糕的是,如果您的应用程序崩溃而缓冲区中仍有消息,缓冲区中的所有消息都会丢失。

如果您在 other-hand 上关闭了 autoAck,并且您将 prefetch_count 设置为 1。在第一条消息被标记为已确认之前,RabbitMQ 不会向消费者发送新消息。

channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(Thread.currentThread().toString() + " - " + new String(body));
            try {
                Thread.sleep(300);
            } finally {
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    });

这要慢很多,但也更安全,并且会确保您只收到您可以处理的消息。