队列最大长度或 TTL 使用获取但不使用
Queue max-length or TTL work with get but not consume
我看到很多帖子询问有关限制队列长度的问题。在我对 Pika 和 RabbitMQ 的实验中,如果我用 arguments={'x-message-ttl': 1000, 'x-max-length': 2, 'x-overflow': 'drop-head'}
声明队列,甚至在生成消息属性时将 expiration='1000'
添加到消息属性中,我可以看到这三个都分别有助于从队列中删除消息.我的目标是确保消费者只收到最新的信息。
但是,正如此处指出的:RabbitMQ messages delivered after x-message-ttl has expired,我只能使用 basic_get
而不是 basic_consume
。
来让它工作
basic_get
好像是拉消息,每次都发送一个请求。我需要能够等待服务器推送消息,而不是轮询它。消费者不是正确的方法吗?消费者使用 x-message-ttl
或 x-max-length
(我试过 basic_qos(prefetch_count=1)
)的要求是什么?
问题是在您链接的代码中 autoAck
设置为 True
。如果您将 autoAck
设置为 True
,当消息在您的缓冲区中时,它被视为已收到,并且 RabbitMQ 将向您发送新消息。
这基本上意味着您 Thread.sleep(300)
所做的只是延迟消息显示的时间。消息仍在接收并放入缓冲区,更糟糕的是,如果您的应用程序崩溃而缓冲区中仍有消息,缓冲区中的所有消息都会丢失。
如果您在 other-hand 上关闭了 autoAck
,并且您将 prefetch_count
设置为 1
。在第一条消息被标记为已确认之前,RabbitMQ 不会向消费者发送新消息。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(Thread.currentThread().toString() + " - " + new String(body));
try {
Thread.sleep(300);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
这要慢很多,但也更安全,并且会确保您只收到您可以处理的消息。
我看到很多帖子询问有关限制队列长度的问题。在我对 Pika 和 RabbitMQ 的实验中,如果我用 arguments={'x-message-ttl': 1000, 'x-max-length': 2, 'x-overflow': 'drop-head'}
声明队列,甚至在生成消息属性时将 expiration='1000'
添加到消息属性中,我可以看到这三个都分别有助于从队列中删除消息.我的目标是确保消费者只收到最新的信息。
但是,正如此处指出的:RabbitMQ messages delivered after x-message-ttl has expired,我只能使用 basic_get
而不是 basic_consume
。
basic_get
好像是拉消息,每次都发送一个请求。我需要能够等待服务器推送消息,而不是轮询它。消费者不是正确的方法吗?消费者使用 x-message-ttl
或 x-max-length
(我试过 basic_qos(prefetch_count=1)
)的要求是什么?
问题是在您链接的代码中 autoAck
设置为 True
。如果您将 autoAck
设置为 True
,当消息在您的缓冲区中时,它被视为已收到,并且 RabbitMQ 将向您发送新消息。
这基本上意味着您 Thread.sleep(300)
所做的只是延迟消息显示的时间。消息仍在接收并放入缓冲区,更糟糕的是,如果您的应用程序崩溃而缓冲区中仍有消息,缓冲区中的所有消息都会丢失。
如果您在 other-hand 上关闭了 autoAck
,并且您将 prefetch_count
设置为 1
。在第一条消息被标记为已确认之前,RabbitMQ 不会向消费者发送新消息。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(Thread.currentThread().toString() + " - " + new String(body));
try {
Thread.sleep(300);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
这要慢很多,但也更安全,并且会确保您只收到您可以处理的消息。