Kafka Streams处理数据过程中出现的错误如何处理

How to handle errors occurring during the processing of data in Kafka Streams

我正在使用 Spring Cloud Stream Kafka Streams 编写 Java 应用程序。这是我正在使用的功能方法片段:

@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return input ->
        input.transform(
            () ->
                new Transformer<String, String, KeyValue<String, String>>() {

                  ProcessorContext context;

                  @Override
                  public void init(ProcessorContext context) {
                    this.context = context;
                  }

                  @Override
                  public void close() {}

                  @Override
                  public KeyValue<String, String> transform(String key, String value) {                       
                         String result = fetch_data_from_database(key, value);
                         return new KeyValue<>(key, result);
                  }
});

fetch_data_from_database() 可以抛出异常。

如果 fetch_from_database() 出现异常,如何停止处理入站 KStream(不应提交偏移量)并使其使用相同的偏移量数据重试处理?

在这种情况下,您需要自行重试逻辑。为此,您可以使用 Spring 的 RetryTemplate 详细介绍了如何在 Kafka Streams 中使用 RetryTemplate。它不像您那样使用 low-level 处理器 API,但它是相同的想法。将您的数据库调用包装在重试模板中,并根据您的要求自定义重试。任何上游处理都将暂停,直到重试次数用完。