Kafka Streams处理数据过程中出现的错误如何处理
How to handle errors occurring during the processing of data in Kafka Streams
我正在使用 Spring Cloud Stream Kafka Streams 编写 Java 应用程序。这是我正在使用的功能方法片段:
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return input ->
input.transform(
() ->
new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {}
@Override
public KeyValue<String, String> transform(String key, String value) {
String result = fetch_data_from_database(key, value);
return new KeyValue<>(key, result);
}
});
fetch_data_from_database() 可以抛出异常。
如果 fetch_from_database() 出现异常,如何停止处理入站 KStream(不应提交偏移量)并使其使用相同的偏移量数据重试处理?
在这种情况下,您需要自行重试逻辑。为此,您可以使用 Spring 的 RetryTemplate
。 详细介绍了如何在 Kafka Streams 中使用 RetryTemplate
。它不像您那样使用 low-level 处理器 API,但它是相同的想法。将您的数据库调用包装在重试模板中,并根据您的要求自定义重试。任何上游处理都将暂停,直到重试次数用完。
我正在使用 Spring Cloud Stream Kafka Streams 编写 Java 应用程序。这是我正在使用的功能方法片段:
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return input ->
input.transform(
() ->
new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {}
@Override
public KeyValue<String, String> transform(String key, String value) {
String result = fetch_data_from_database(key, value);
return new KeyValue<>(key, result);
}
});
fetch_data_from_database() 可以抛出异常。
如果 fetch_from_database() 出现异常,如何停止处理入站 KStream(不应提交偏移量)并使其使用相同的偏移量数据重试处理?
在这种情况下,您需要自行重试逻辑。为此,您可以使用 Spring 的 RetryTemplate
。 RetryTemplate
。它不像您那样使用 low-level 处理器 API,但它是相同的想法。将您的数据库调用包装在重试模板中,并根据您的要求自定义重试。任何上游处理都将暂停,直到重试次数用完。