Kafka中针对不同消费者的不同重试策略

Different retry strategies for different consumers in Kafka

我们正在处理这样一种情况,我们需要为同一应用程序中的不同消费者使用不同的重试策略。

请参考下图(简要架构图):

main_consumer 使用来自 main_topic 的负载并尝试将其发送到 API。如果 API 处理失败,我们将把这个失败的有效负载写入另一个名为 error_topic 的主题。有一个不同的消费者 (error_consumer)error_topic 消费并通过 3 次重试再次将有效负载发送到 API。如果仍然失败,则 error_consumer 将此有效载荷推送到 DLQ.

我们面临的问题:

我们需要 main_consumer 不重试失败,error_consumer 失败重试 3 次。我们将 maxAttempts 作为 main_consumer 的 1,将 maxAttempts 作为 error_consumer 的 3。但是使用此配置,main_consumer 重试 3 次,error_consumer 重试一次。它的工作与我们的预期完全相反。

P.S :我们尝试为两个消费者交换 maxAttempts(这是不合逻辑的),但没有成功。

下面是我们使用的Spring云流应用配置:

我们是 运行 具有以下两个配置文件的应用程序。

应用-main.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          main-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        main-consumer-channel:
          destination: main_topic
          consumer:
            maxAttempts: 1
            backOffInitialInterval: 5000
            backOffMultiplier: 2

应用程序错误-retry.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          error-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        error-consumer-channel:
          destination: error_topic
          consumer:
             maxAttempts: 3
             backOffInitialInterval: 5000
             backOffMultiplier: 2

根据 spring 文档 - https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html,maxAttempts 配置应在

"spring.cloud.stream.bindings.<channelName>.consumer."

在你的配置中,它看起来像在

"spring.cloud.stream.kafka.bindings.<channelName>.consumer."

MaxAttempts 似乎不是 kafka 绑定道具的有效配置 - https://github.com/spring-cloud/spring-cloud-stream-binder-kafka

这对我来说很好...

@SpringBootApplication
@EnableBinding(Inputs.class)
public class So57522645Application {

    public static void main(String[] args) {
        SpringApplication.run(So57522645Application.class, args);
    }

    @StreamListener("input1")
    public void listen1(String in) {
        System.out.println("main: " + in);
        throw new RuntimeException("fail");
    }

    @StreamListener("input2")
    public void listen2(String in) {
        System.out.println("error: " + in);
        throw new RuntimeException("fail");
    }

    @StreamListener("input3")
    public void listen3(String in) {
        System.out.println("final: " + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> template.send("main", "test".getBytes());
    }

}

interface Inputs {

    @Input
    MessageChannel input1();

    @Input
    MessageChannel input2();

    @Input
    MessageChannel input3();

}
spring:
  cloud:
    stream:
      bindings:
        input1:
          consumer:
            max-attempts: 1
          destination: main
          group: grp1
        input2:
          consumer:
            max-attempts: 3
          destination: error.main.grp1
          group: grp2
        input3:
          destination: error.error.main.grp1.grp2
          group: grp3
      kafka:
        bindings:
          input1:
            consumer:
              enable-dlq: true
          input2:
            consumer:
              enable-dlq: true

main: test
error: test
error: test
error: test
final: test