如何防止消费被拒绝的消息

How to prevent from consuming a rejected message

我有一个消费者从队列中消费一条消息(手动确认),然后通过调用 Basic.Nack() 或 Basic.reject() 方法拒绝该消息。之后,消费者仍然消费那个被拒绝的消息。

如何防止同一消费者消费被拒绝的消息?相反,应该将被拒绝的消息重新传送给不同的消费者?

更新:

消息监听委托

public class MessageListenerDelegate implements ChannelAwareMessageListener {
private MessageListener messageListener;

@Override
public void onMessage(Message message, Channel channel) throws Exception {

}

// setter and getter for MessageListener
....

}

public interface MessageListener {
void onMessage(MessageResourceHolder holder) throws Exception;

}

在应用程序启动期间设置并行消费者并启动消费者

 
public void listen(String queueName, MessageListener messageListener) {
    SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(connectionFactory);
    listener.setMessageListener(new MessageListenerAdapter(new MessageListenerDelegate(messageListener)));
    listener.addQueueNames(queueName);
    listener.setConcurrentConsumers(5);
    listener.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    listener.start();
}

现在监听队列,然后调用 Basic.Nack() 拒绝消费的消息


listen(queueName, new MessageListener() {

            @Override
            public void onMessage(MessageResourceHolder holder) throws IOException {
                    holder.getChannel().basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);

            }
        }); 

消费者在很多线程中运行,所以它总是从队列中消费消息,包括被拒绝的消息

basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);

通过将最后一个参数设置为 true,您明确告诉 RabbitMQ 重新排队消息。

/**
 * Reject one or several received messages.
 *
 * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
 * @see com.rabbitmq.client.AMQP.Basic.Nack
 * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
 * @param multiple true to reject all messages up to and including
 * the supplied delivery tag; false to reject just the supplied
 * delivery tag.
 * @param requeue true if the rejected message(s) should be requeued rather
 * than discarded/dead-lettered
 * @throws java.io.IOException if an error is encountered
 */
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
        throws IOException;