Spring cloud stream Kafka streams binder 如果在处理步骤中发生故障,如何重试处理消息?

How to make Spring cloud stream Kafka streams binder retry processing a message if a failure occurs during the processing step?

我正在使用 Spring Cloud Stream 开发 Kafka Streams。在消息处理应用程序中,它可能会产生错误。所以消息不应该提交并重试。

我的申请方法-

@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));

dao.insert(wc);

return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}

这里如果DAO插入方法失败,消息不应该被发布到输出主题,并且应该重试相同消息的处理。

我们如何配置 kafka streams binder 来做到这一点?。非常感谢有关此的任何帮助。

Spring Cloud Stream Kafka Streams binder 本身不在您的业务逻辑执行中提供此类重试机制。但是,解决此用例的一种方法可能是将您的关键调用(在本例中为 dao.insert())包装在您在本地定义的 RetryTemplate 中。这是一个可能的实现,它使用 1 秒的退避策略重试 10 次。如果您正在尝试此解决方案,请确保从您的主要业务逻辑中提取与 RetryTemplate 相关的公共代码。我没试过这个,但它应该有效。

KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
  WordCount wc = new WordCount();
  ...

  org.springframework.retry.support.RetryTemplate retryTemplate = new 
   RetryTemplate();

  RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
  FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
  backOffPolicy.setBackOffPeriod(1000);

  retryTemplate.setBackOffPolicy(backOffPolicy);
  retryTemplate.setRetryPolicy(retryPolicy);

  retryTemplate.execute(context -> {
    try {
      dao.insert(wc);
    }
    catch (Exception e) {
      throw new IllegalStateException(..);
   }
  });

  return new KeyValue<>(k.key(),wc);
});

重试dao插入操作10次后,如果仍然失败,将抛出异常终止应用程序,此时不会提交偏移量。重新启动时,在解决了潜在问题后,您的应用程序仍应从该偏移量继续。