Spring 云流kafka消费者错误处理和重试问题

Spring cloud stream kafka consumer error handling and retries issues

我在 spring 云流 kafka 活页夹的错误处理场景中需要帮助。我的应用程序有 java 8 个消费者,其绑定在 application.yaml 中指定。消费者写成:

@Bean
public Consumer<Message<Transaction>> doProcess() {

    return message -> {
        Transaction transaction = message.getPayload();
       
        if(true) {
            throw new RuntimeException("exception!! !!:)");
        }
       Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, 
       Acknowledgment.class);
       if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
       }
  }
}

application.yaml:

spring.application.name: appname
spring.cloud.stream:
  function.definition: doProcess
  kafka:
    default.consumer:
      startOffset: latest
      useNativeDecoding: true
    bindings:
      input.consumer.autoCommitOffset: false

bindings:
  doProcess-in-0:
    destination: kafka.input.topic.name
    group: appGroup
    content-type: application/*+avro
    consumer:
      autoCommitOffset: false.

现在,我正在努力处理错误并遇到两个问题:

  1. 我正在尝试手动确认消息的消耗,而不是将 autoCommitOffset 设为 true。因此,当我将 autoCommitOffset 设置为 false 并测试错误场景时,面对奇怪的行为,每当抛出异常时,消息都会重试 'n' 次,并且即使在之后也会重试/重新传递失败的消息重新启动服务(如果重新启动是在 n 重试完成之前完成的)。一旦重试完成,即使在服务重新启动后也不会选择消息。这是否意味着,消费者在消息的 n re-trial/re-delivery 之后提交偏移量,这不应该是因为 autoCommitOffset 为 false。

    注意:我没有配置任何dlq。

  2. 我们需要编写自定义异常处理程序,我们可以在其中捕获异常(应用程序代码和框架中的错误)并通过 AWS 环境中的电子邮件向用户组发送通知。但是,我们无法找到任何可以捕获这两种类型异常的错误处理程序。类似于扩展 SeekToCurrentErrorHandler 或任何其他可以在错误事件上调用的侦听器。

编辑:

根据 Gary 提供的解决方案,我们可以使用以下 bean 来配置自定义错误处理程序:

@Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer> MQLCC() {
        System.out.println(String.format("DEBUG: Bean %s has bean created.", "MQLCC"));
        return new ListenerContainerCustomizerCustom ();
    }

    private static class ListenerContainerCustomizerCustom implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
        @Override
        public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
            System.out.println(String.format("HELLO from container %s, destination: %s, group: %s", container, destinationName, group));
        }

    }

侦听器容器中默认的错误处理程序会重试10次,然后记录错误并丢弃记录;对于不同的行为,您需要配置自定义错误处理程序和恢复策略。使用 ListenerContainerCustomizer bean 来配置容器。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-ehhttps://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

(3.2 及更高版本)

https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-currenthttps://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#dead-letters

对于早期版本。