如何防止消费被拒绝的消息
How to prevent from consuming a rejected message
我有一个消费者从队列中消费一条消息(手动确认),然后通过调用 Basic.Nack() 或 Basic.reject() 方法拒绝该消息。之后,消费者仍然消费那个被拒绝的消息。
如何防止同一消费者消费被拒绝的消息?相反,应该将被拒绝的消息重新传送给不同的消费者?
更新:
消息监听委托
public class MessageListenerDelegate implements ChannelAwareMessageListener {
private MessageListener messageListener;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
// setter and getter for MessageListener
....
}
public interface MessageListener {
void onMessage(MessageResourceHolder holder) throws Exception;
}
在应用程序启动期间设置并行消费者并启动消费者
public void listen(String queueName, MessageListener messageListener) {
SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(connectionFactory);
listener.setMessageListener(new MessageListenerAdapter(new MessageListenerDelegate(messageListener)));
listener.addQueueNames(queueName);
listener.setConcurrentConsumers(5);
listener.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listener.start();
}
现在监听队列,然后调用 Basic.Nack() 拒绝消费的消息
listen(queueName, new MessageListener() {
@Override
public void onMessage(MessageResourceHolder holder) throws IOException {
holder.getChannel().basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);
}
});
消费者在很多线程中运行,所以它总是从队列中消费消息,包括被拒绝的消息
basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);
通过将最后一个参数设置为 true
,您明确告诉 RabbitMQ 重新排队消息。
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
我有一个消费者从队列中消费一条消息(手动确认),然后通过调用 Basic.Nack() 或 Basic.reject() 方法拒绝该消息。之后,消费者仍然消费那个被拒绝的消息。
如何防止同一消费者消费被拒绝的消息?相反,应该将被拒绝的消息重新传送给不同的消费者?
更新:
消息监听委托
public class MessageListenerDelegate implements ChannelAwareMessageListener {
private MessageListener messageListener;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
// setter and getter for MessageListener
....
}
public interface MessageListener {
void onMessage(MessageResourceHolder holder) throws Exception;
}
在应用程序启动期间设置并行消费者并启动消费者
public void listen(String queueName, MessageListener messageListener) {
SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(connectionFactory);
listener.setMessageListener(new MessageListenerAdapter(new MessageListenerDelegate(messageListener)));
listener.addQueueNames(queueName);
listener.setConcurrentConsumers(5);
listener.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listener.start();
}
现在监听队列,然后调用 Basic.Nack() 拒绝消费的消息
listen(queueName, new MessageListener() {
@Override
public void onMessage(MessageResourceHolder holder) throws IOException {
holder.getChannel().basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);
}
});
消费者在很多线程中运行,所以它总是从队列中消费消息,包括被拒绝的消息
basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);
通过将最后一个参数设置为 true
,您明确告诉 RabbitMQ 重新排队消息。
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;