如何将消费者异常作为 ack 处理?

How to handle a consumer exception as an ack?

我正在尝试为消费者失败实施指数退避。为此,我有三个 DLX 队列:RETRY -> MAIN -> FAILED.

MAIN 拒绝的任何内容都会进入 FAILED,添加到 RETRY 的任何内容都会在每条消息 TTL 后进入 MAIN。消费者从 MAIN.

接收

我实现了 ErrorHandler 并将其设置在 SimpleRabbitListenerContainerFactory 上。此处理程序要么计算一个新的 TTL 并将消息发送到 RETRY 队列,要么抛出 AmqpRejectAndDontRequeueException 如果不可能或超过重试次数以将其 DLX 到 FAILED。问题是,我不知道如何删除原始消息。

据我所知,我必须确认它,但 Channel 在错误处理程序中不可用,并且没有其他可抛出的异常会触发确认。

如果我删除 MAIN -> FAILED DLX 并切换到手动将消息添加到 FAILED,那么如果这不起作用,我就丢失了消息。

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
    // what to do here?
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }
  // or here?
}

我想我马上就找到了答案。之前因为扔错地方错过了

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }

  throw new ImmediateAcknowledgeAmqpException("Queued for retry");
}

ImmediateAcknowledgeAmqpException

Special exception for listener implementations that want to signal that the current batch of messages should be acknowledged immediately (i.e. as soon as possible) without rollback, and without consuming any more messages within the current transaction.

这应该是安全的,因为我没有使用批次或交易,只有发布者 returns。


旁注:我还应该知道指数退避实际上不会正常工作:

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired.