Spring 批处理的 Cloud Stream 反序列化错误处理

Spring Cloud Stream deserialization error handling for Batch processing

我有一个关于处理批处理时 Spring Cloud Stream 中的反序列化异常的问题(即 batch-mode: true)。

根据这里的文档,https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#error-handling-deserializer,(查看 FailedFooProvider 的实现),看起来这个函数应该 return 是原始消息的子类。

这里的意图是 Foo 和 BadFoo 的列表将以原始 @StreamListener 方法结束,然后由代码(即我)将它们分类并单独处理?我怀疑是这种情况,因为我读到 ,因为它会重新提交整批。

如果是这种情况,如果应用程序通过不同的 @StreamListener(比如 Foo 和 Bar)接收到不止一种消息类型怎么办。在这种情况下,价值函数 return 应该是什么类型?下面是说明第二个问题的伪代码?

@StreamListener
public void readFoos(List<Foo> foos) {
    List<> badFoos = foos.stream()
        .filter(f -> f instanceof BadFoo)
        .map(f -> (BadFoo) f)
        .collect(Collectors.toList());
   // logic
}
@StreamListener
public void readBars(List<Bar> bars) {
  // logic
}


// Updated to return Object and let apply() determine subclass
public class FailedFooProvider implements Function<FailedDeserializationInfo, Object> {

  @Override
  public Object apply(FailedDeserializationInfo info) {
    if (info.getTopics().equals("foo-topic") {
        return new BadFoo(info);
    }
    else if (info.getTopics().equals("bar-topic") {
        return new BadBar(info);
    }
  }

}

是的,该列表将包含反序列化失败的函数结果;应用程序需要处理它们。

函数需要 return 与成功反序列化 return 相同的类型。

您不能对批处理侦听器使用条件。如果列表混合了 Foos 和 Bars,它们都将发送给同一个侦听器。