Spring cloud stream Kafka streams binder 如果在处理步骤中发生故障,如何重试处理消息?
How to make Spring cloud stream Kafka streams binder retry processing a message if a failure occurs during the processing step?
我正在使用 Spring Cloud Stream 开发 Kafka Streams。在消息处理应用程序中,它可能会产生错误。所以消息不应该提交并重试。
我的申请方法-
@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));
dao.insert(wc);
return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}
这里如果DAO插入方法失败,消息不应该被发布到输出主题,并且应该重试相同消息的处理。
我们如何配置 kafka streams binder 来做到这一点?。非常感谢有关此的任何帮助。
Spring Cloud Stream Kafka Streams binder 本身不在您的业务逻辑执行中提供此类重试机制。但是,解决此用例的一种方法可能是将您的关键调用(在本例中为 dao.insert()
)包装在您在本地定义的 RetryTemplate
中。这是一个可能的实现,它使用 1 秒的退避策略重试 10 次。如果您正在尝试此解决方案,请确保从您的主要业务逻辑中提取与 RetryTemplate 相关的公共代码。我没试过这个,但它应该有效。
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
...
org.springframework.retry.support.RetryTemplate retryTemplate = new
RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.execute(context -> {
try {
dao.insert(wc);
}
catch (Exception e) {
throw new IllegalStateException(..);
}
});
return new KeyValue<>(k.key(),wc);
});
重试dao插入操作10次后,如果仍然失败,将抛出异常终止应用程序,此时不会提交偏移量。重新启动时,在解决了潜在问题后,您的应用程序仍应从该偏移量继续。
我正在使用 Spring Cloud Stream 开发 Kafka Streams。在消息处理应用程序中,它可能会产生错误。所以消息不应该提交并重试。
我的申请方法-
@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));
dao.insert(wc);
return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}
这里如果DAO插入方法失败,消息不应该被发布到输出主题,并且应该重试相同消息的处理。
我们如何配置 kafka streams binder 来做到这一点?。非常感谢有关此的任何帮助。
Spring Cloud Stream Kafka Streams binder 本身不在您的业务逻辑执行中提供此类重试机制。但是,解决此用例的一种方法可能是将您的关键调用(在本例中为 dao.insert()
)包装在您在本地定义的 RetryTemplate
中。这是一个可能的实现,它使用 1 秒的退避策略重试 10 次。如果您正在尝试此解决方案,请确保从您的主要业务逻辑中提取与 RetryTemplate 相关的公共代码。我没试过这个,但它应该有效。
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
...
org.springframework.retry.support.RetryTemplate retryTemplate = new
RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.execute(context -> {
try {
dao.insert(wc);
}
catch (Exception e) {
throw new IllegalStateException(..);
}
});
return new KeyValue<>(k.key(),wc);
});
重试dao插入操作10次后,如果仍然失败,将抛出异常终止应用程序,此时不会提交偏移量。重新启动时,在解决了潜在问题后,您的应用程序仍应从该偏移量继续。