如何在 Spring AMQP 中使用 Ack 或 Nack

How to use Ack or Nack in Spring AMQP

我是 Spring AMQP 的新手。我有一个作为生产者的应用程序向另一个作为消费者的应用程序发送消息。

消费者收到消息后,我们将对数据进行验证。

如果数据正确,我们必须确认并且消息应该从队列中删除。 如果数据不正确,我们必须对数据进行 NACK(否定确认),以便在 RabbitMQ.

中重新排队

遇到了

**factory.setDefaultRequeueRejected(false);**(完全不会重新排队消息)

**factory.setDefaultRequeueRejected(true);**(异常时会重新排队消息)

但我会根据验证确认消息。然后它应该删除消息。如果 NACK,则重新排队消息。

我已在 RabbitMQ 网站上阅读

AMQP 规范定义了 basic.reject 方法,该方法允许客户端拒绝单独的已发送消息,指示代理丢弃它们或重新排队它们

如何实现上述场景?请提供一些例子。

我尝试了一个小程序

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));

消息不会因不同的异常而重新排队 factory.setDefaultRequeueRejected(真)

09:46:38,854 ERROR [stderr] (SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException: no processes deployed with key 'WF89012'

09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1) Received from Error Queue: {ERROR=Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly}

参见 the documentation

默认情况下,(使用 defaultRequeueRejected=true)如果侦听器正常退出,容器将确认消息(导致消息被删除),或者如果侦听器抛出异常,则拒绝(并重新排队)消息。

如果侦听器(或错误处理程序)抛出 AmqpRejectAndDontRequeueException,默认行为将被覆盖并丢弃消息(或路由到 DLX/DLQ,如果已配置)- 容器调用 basicReject(false) 而不是 basicReject(true).

因此,如果您的验证失败,请抛出 AmqpRejectAndDontRequeueException。或者,使用自定义错误处理程序配置您的侦听器以将您的异常转换为 AmqpRejectAndDontRequeueException.

this answer中有描述。

如果你真的想为自己的确认负责,请将确认模式设置为 MANUAL 并使用 ChannelAwareMessageListener 如果你使用 @RabbitListener .

但是大多数人只是让容器来处理事情(一旦他们明白发生了什么)。通常,使用手动 acks 用于特殊用例,例如延迟 acks 或提前 acking。

编辑

我指出的答案有误(现已修复);你必须看看 ListenerExecutionFailedException 的原因。我刚刚对此进行了测试,它按预期工作...

@SpringBootApplication
public class So39530787Application {

    private static final String QUEUE = "So39530787";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend(QUEUE, "foo");
        template.convertAndSend(QUEUE, "bar");
        template.convertAndSend(QUEUE, "baz");
        So39530787Application bean = context.getBean(So39530787Application.class);
        bean.latch.await(10, TimeUnit.SECONDS);
        System.out.println("Expect 1 foo:"  + bean.fooCount);
        System.out.println("Expect 3 bar:"  + bean.barCount);
        System.out.println("Expect 1 baz:"  + bean.bazCount);
        context.close();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
        return factory;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, false, false, true);
    }
    private int fooCount;

    private int barCount;

    private int bazCount;

    private final CountDownLatch latch = new CountDownLatch(5);

    @RabbitListener(queues = QUEUE)
    public void handle(String in) throws Exception {
        System.out.println(in);
        latch.countDown();
        if ("foo".equals(in) && ++this.fooCount < 3) {
            throw new FooException();
        }
        else if ("bar".equals(in) && ++this.barCount < 3) {
            throw new BarException();
        }
        else if ("baz".equals(in)) {
            this.bazCount++;
        }
    }

    @SuppressWarnings("serial")
    public static class FooException extends Exception { }

    @SuppressWarnings("serial")
    public static class BarException extends Exception { }

}

结果:

Expect 1 foo:1
Expect 3 bar:3
Expect 1 baz:1