Spring Kafka - "ErrorHandler threw an exception" 并丢失了一些记录
Spring Kafka - "ErrorHandler threw an exception" and lost some records
Consumer
一次轮询 2 条记录,即:
@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = Map.of(
BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
GROUP_ID_CONFIG, "my-consumers",
AUTO_OFFSET_RESET_CONFIG, "earliest",
MAX_POLL_RECORDS_CONFIG, 2);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
}
和ErrorHandler
可能无法处理错误记录:
class MyListenerErrorHandler implements ContainerAwareErrorHandler {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
simulateBugInErrorHandling(records.get(0));
skipFailedRecord(); // seek offset+1, which never happens
}
private void simulateBugInErrorHandling(ConsumerRecord<?, ?> record) {
throw new NullPointerException(
"DB transaction failed when saving info about failure on offset = " + record.offset());
}
}
那么这种情况是可能的:
- 主题获得 3 条记录
Consumer
一次轮询 2 条记录
MessageListener
由于有效载荷错误无法处理第一条记录
ErrorHandler
处理失败失败,自身抛出异常,例如由于一些临时问题
- 第三条记录得到处理
- 从不处理第二条记录(从不输入
MessageListener
)
当ErrorHandler
在上述情况下抛出异常时,如何确保没有未处理的记录?
我的目标是实现带延迟的有状态重试逻辑,但为了简洁起见,我省略了负责跟踪失败记录和延迟重试的代码。
我预计在 ErrorHandler
抛出异常后,应该不会跳过整批记录。但确实如此。
- 这是正确的行为吗?
- 我是否应该手动处理使用 Spring/Kafka 默认值的提交?
- 我应该使用不同的
ErrorHandler
还是 handle
方法? (我需要访问 Container
才能为延迟重试逻辑创建 pause()
;不能使用 Thread.sleep()
)
某种相关问题:https://github.com/spring-projects/spring-kafka/issues/1265
完整代码:https://github.com/ptomaszek/spring-kafka-error-handler
必须重新定位消费者(使用搜索),以便在失败后重新获取记录。
使用 DefaultErrorHandler
(2.8.x 及更高版本)或早期版本的 SeekToCurrentErrorHandler
。
您可以添加重试选项和恢复器来处理失败的记录;默认情况下它只是记录。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh
https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current
您需要先进行查找(或在 finally 块中),然后才能抛出任何异常;如果错误处理程序抛出异常,容器不会提交偏移量。
Kafka维护2个偏移量——当前提交的偏移量和当前位置(消费者启动时设置为提交的偏移量)。下一次轮询总是 returns 上次轮询后的下一条记录。除非执行搜索。
默认错误处理程序捕获恢复程序抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。参见 SeekUtils.doSeeks()
。
Consumer
一次轮询 2 条记录,即:
@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = Map.of(
BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
GROUP_ID_CONFIG, "my-consumers",
AUTO_OFFSET_RESET_CONFIG, "earliest",
MAX_POLL_RECORDS_CONFIG, 2);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
}
和ErrorHandler
可能无法处理错误记录:
class MyListenerErrorHandler implements ContainerAwareErrorHandler {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
simulateBugInErrorHandling(records.get(0));
skipFailedRecord(); // seek offset+1, which never happens
}
private void simulateBugInErrorHandling(ConsumerRecord<?, ?> record) {
throw new NullPointerException(
"DB transaction failed when saving info about failure on offset = " + record.offset());
}
}
那么这种情况是可能的:
- 主题获得 3 条记录
Consumer
一次轮询 2 条记录MessageListener
由于有效载荷错误无法处理第一条记录ErrorHandler
处理失败失败,自身抛出异常,例如由于一些临时问题- 第三条记录得到处理
- 从不处理第二条记录(从不输入
MessageListener
)
当ErrorHandler
在上述情况下抛出异常时,如何确保没有未处理的记录?
我的目标是实现带延迟的有状态重试逻辑,但为了简洁起见,我省略了负责跟踪失败记录和延迟重试的代码。
我预计在 ErrorHandler
抛出异常后,应该不会跳过整批记录。但确实如此。
- 这是正确的行为吗?
- 我是否应该手动处理使用 Spring/Kafka 默认值的提交?
- 我应该使用不同的
ErrorHandler
还是handle
方法? (我需要访问Container
才能为延迟重试逻辑创建pause()
;不能使用Thread.sleep()
)
某种相关问题:https://github.com/spring-projects/spring-kafka/issues/1265
完整代码:https://github.com/ptomaszek/spring-kafka-error-handler
必须重新定位消费者(使用搜索),以便在失败后重新获取记录。
使用 DefaultErrorHandler
(2.8.x 及更高版本)或早期版本的 SeekToCurrentErrorHandler
。
您可以添加重试选项和恢复器来处理失败的记录;默认情况下它只是记录。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh
https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current
您需要先进行查找(或在 finally 块中),然后才能抛出任何异常;如果错误处理程序抛出异常,容器不会提交偏移量。
Kafka维护2个偏移量——当前提交的偏移量和当前位置(消费者启动时设置为提交的偏移量)。下一次轮询总是 returns 上次轮询后的下一条记录。除非执行搜索。
默认错误处理程序捕获恢复程序抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。参见 SeekUtils.doSeeks()
。