Spring 批处理的 Cloud Stream 反序列化错误处理
Spring Cloud Stream deserialization error handling for Batch processing
我有一个关于处理批处理时 Spring Cloud Stream 中的反序列化异常的问题(即 batch-mode: true
)。
根据这里的文档,https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#error-handling-deserializer,(查看 FailedFooProvider
的实现),看起来这个函数应该 return 是原始消息的子类。
这里的意图是 Foo 和 BadFoo 的列表将以原始 @StreamListener
方法结束,然后由代码(即我)将它们分类并单独处理?我怀疑是这种情况,因为我读到 ,因为它会重新提交整批。
如果是这种情况,如果应用程序通过不同的 @StreamListener
(比如 Foo 和 Bar)接收到不止一种消息类型怎么办。在这种情况下,价值函数 return 应该是什么类型?下面是说明第二个问题的伪代码?
@StreamListener
public void readFoos(List<Foo> foos) {
List<> badFoos = foos.stream()
.filter(f -> f instanceof BadFoo)
.map(f -> (BadFoo) f)
.collect(Collectors.toList());
// logic
}
@StreamListener
public void readBars(List<Bar> bars) {
// logic
}
// Updated to return Object and let apply() determine subclass
public class FailedFooProvider implements Function<FailedDeserializationInfo, Object> {
@Override
public Object apply(FailedDeserializationInfo info) {
if (info.getTopics().equals("foo-topic") {
return new BadFoo(info);
}
else if (info.getTopics().equals("bar-topic") {
return new BadBar(info);
}
}
}
是的,该列表将包含反序列化失败的函数结果;应用程序需要处理它们。
函数需要 return 与成功反序列化 return 相同的类型。
您不能对批处理侦听器使用条件。如果列表混合了 Foos 和 Bars,它们都将发送给同一个侦听器。
我有一个关于处理批处理时 Spring Cloud Stream 中的反序列化异常的问题(即 batch-mode: true
)。
根据这里的文档,https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#error-handling-deserializer,(查看 FailedFooProvider
的实现),看起来这个函数应该 return 是原始消息的子类。
这里的意图是 Foo 和 BadFoo 的列表将以原始 @StreamListener
方法结束,然后由代码(即我)将它们分类并单独处理?我怀疑是这种情况,因为我读到
如果是这种情况,如果应用程序通过不同的 @StreamListener
(比如 Foo 和 Bar)接收到不止一种消息类型怎么办。在这种情况下,价值函数 return 应该是什么类型?下面是说明第二个问题的伪代码?
@StreamListener
public void readFoos(List<Foo> foos) {
List<> badFoos = foos.stream()
.filter(f -> f instanceof BadFoo)
.map(f -> (BadFoo) f)
.collect(Collectors.toList());
// logic
}
@StreamListener
public void readBars(List<Bar> bars) {
// logic
}
// Updated to return Object and let apply() determine subclass
public class FailedFooProvider implements Function<FailedDeserializationInfo, Object> {
@Override
public Object apply(FailedDeserializationInfo info) {
if (info.getTopics().equals("foo-topic") {
return new BadFoo(info);
}
else if (info.getTopics().equals("bar-topic") {
return new BadBar(info);
}
}
}
是的,该列表将包含反序列化失败的函数结果;应用程序需要处理它们。
函数需要 return 与成功反序列化 return 相同的类型。
您不能对批处理侦听器使用条件。如果列表混合了 Foos 和 Bars,它们都将发送给同一个侦听器。