使用 Spring Kafka 框架时如何处理 errors/exceptions?
How to handler errors/exceptions while using Spring Kafka framework?
我无法找到如何为 spring kafka 消费者进行自定义错误处理。
我的要求是:
- 对于任何反序列化错误,只需将错误和消息写入数据库即可。
- 在
@KafkaListener
方法下执行有任何错误,重试3次,然后将错误和信息写入数据库。
从 spring 文档中,我发现,
对于 1,我将不得不使用 ErrorHandlingDeserializer
然后它将调用 @KafkaListener 错误处理程序。
对于 2,框架提供了 SeekToCurrentErrorHandler
来处理消息重试。
除了启用已配置的重试之外,我不明白在哪里可以添加代码以将 exception/message 写入数据库。
添加恢复器到 SeekToCurrentErrorHandler
new SeekToCurrentErrorHandler((rec, ex) -> {
Throwable cause = ex.getCause();
if (cause instanceof DeserializationException) {
...
}
else {
...
}, new FixedBackOff(2000L, 2L));
默认情况下,不重试反序列化异常;大多数其他人在调用恢复器之前重试。
我无法找到如何为 spring kafka 消费者进行自定义错误处理。
我的要求是:
- 对于任何反序列化错误,只需将错误和消息写入数据库即可。
- 在
@KafkaListener
方法下执行有任何错误,重试3次,然后将错误和信息写入数据库。
从 spring 文档中,我发现,
对于 1,我将不得不使用 ErrorHandlingDeserializer
然后它将调用 @KafkaListener 错误处理程序。
对于 2,框架提供了 SeekToCurrentErrorHandler
来处理消息重试。
除了启用已配置的重试之外,我不明白在哪里可以添加代码以将 exception/message 写入数据库。
添加恢复器到 SeekToCurrentErrorHandler
new SeekToCurrentErrorHandler((rec, ex) -> {
Throwable cause = ex.getCause();
if (cause instanceof DeserializationException) {
...
}
else {
...
}, new FixedBackOff(2000L, 2L));
默认情况下,不重试反序列化异常;大多数其他人在调用恢复器之前重试。