Spring 云流+ spring 重试,如何添加发送到DLQ 的恢复回调和禁用逻辑?

Spring cloud stream + spring retry, How to add recovery callback and disable logic that send to DLQ?

我正在使用spring云流+ rabbit mq binder

在我的 @StreaListener 中,我想使用 RetryTemplate 对特定异常应用重试逻辑。在重试用尽或抛出不可重试错误后,我想添加一个恢复回调,它将带有错误消息的新记录保存到我的 Postgres 数据库中,并以消息结束(移至下一个)。 这是我到目前为止得到的:

  @StreamListener(Sink.INPUT)
  public void saveUser(User user) {    
    User user = userService.saveUser(user); //could throw exceptions
    log.info(">>>>>>User is created successfully: {}", user);
  }

  @StreamRetryTemplate
  public RetryTemplate myRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());

    Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
    retryableExceptions.put(ConnectionException.class, true);
    retryTemplate.registerListener(new RetryListener() {
      @Override
      public <T, E extends Throwable> boolean open(RetryContext context,
        RetryCallback<T, E> callback) {
        return true;
      }

      @Override
      public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
        Throwable throwable) {
        //could add recovery logic here, like save error to db why sertain user was not saved
        log.info("retries exausted");
      }

      @Override
      public <T, E extends Throwable> void onError(RetryContext context,
        RetryCallback<T, E> callback, Throwable throwable) {
        log.error("Error on retry", throwable);
      }
    });

    retryTemplate.setRetryPolicy(
      new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true));
    return retryTemplate;
  }

从属性来看,我只有这些(没有任何 dlq 配置)

spring.cloud.stream.bindings.input.destination = user-topic
spring.cloud.stream.bindings.input.group = user-consumer

重试失败后我得到了这个日志。

2020-06-01 20:05:58.674  INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:56722]
2020-06-01 20:05:58.685  INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://guest@127.0.0.1:56722/, localPort= 50728]
2020-06-01 20:05:58.697  INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration           : retry finish
2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)

触发RetryListener关闭方法后,我可以看到监听器试图连接到DLX,可能是为了发布一条错误消息。而且我不希望它每次都在日志中观察此错误消息。

所以我的问题是:

1) 在哪里为我的 retryTemplate 添加 RecoveryCalback?据说我可以在 RetryListener#close 方法中通过将错误保存到数据库来编写恢复逻辑,但显然应该有更合适的方法来做到这一点。

2) 如何配置 rabbit-mq 绑定器不向 DLQ 发送消息,也许我可以重写一些方法?目前,在重试次数耗尽(或出现不可重试错误)后,侦听器会尝试向 DLX 发送消息并记录无法找到它的错误。我不需要在我的应用程序范围内将任何消息发送到 dlq,我只需要将它保存到 DB。

目前没有提供自定义恢复回调的机制。

republishToDlq设置为false(以前是)。改为true,如果autoBindDlq为false(默认)则错误;我会为此打开一个问题。

然后,当重试次数耗尽时,将异常抛回给容器;您可以使用 ListenerContainerCustomizer 添加自定义 ErrorHandler.

但是,您获得的数据将有一个 ListenerExecutionFailed 异常,原始(未转换)Spring AMQP Message 在其 failedMessage 属性 ,不是您的 User 对象。

编辑

您可以向绑定的错误通道添加侦听器...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So62137618Application {

    public static void main(String[] args) {
        SpringApplication.run(So62137618Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @ServiceActivator(inputChannel = "user-topic.user-consumer.errors")
    public void errors(String in) {
        System.out.println("Retries exhausted for " + new String((byte[]) in.getFailedMessage().getPayload()));
    }

}