Spring Kafka - "ErrorHandler threw an exception" 并丢失了一些记录

Spring Kafka - "ErrorHandler threw an exception" and lost some records

Consumer一次轮询 2 条记录,即:

    @Bean
    ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = Map.of(
                BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                GROUP_ID_CONFIG, "my-consumers",
                AUTO_OFFSET_RESET_CONFIG, "earliest",
                MAX_POLL_RECORDS_CONFIG, 2);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
    }

ErrorHandler可能无法处理错误记录:

class MyListenerErrorHandler implements ContainerAwareErrorHandler {

    @Override
    public void handle(Exception thrownException,
                       List<ConsumerRecord<?, ?>> records,
                       Consumer<?, ?> consumer,
                       MessageListenerContainer container) {
        simulateBugInErrorHandling(records.get(0));
        skipFailedRecord(); // seek offset+1, which never happens
    }

    private void simulateBugInErrorHandling(ConsumerRecord<?, ?> record) {
        throw new NullPointerException(
                "DB transaction failed when saving info about failure on offset = " + record.offset());
    }
}

那么这种情况是可能的:

  1. 主题获得 3 条记录
  2. Consumer 一次轮询 2 条记录
  3. MessageListener 由于有效载荷错误无法处理第一条记录
  4. ErrorHandler 处理失败失败,自身抛出异常,例如由于一些临时问题
  5. 第三条记录得到处理
  6. 从不处理第二条记录(从不输入 MessageListener

ErrorHandler在上述情况下抛出异常时,如何确保没有未处理的记录?

我的目标是实现带延迟的有状态重试逻辑,但为了简洁起见,我省略了负责跟踪失败记录和延迟重试的代码。


我预计在 ErrorHandler 抛出异常后,应该不会跳过整批记录。但确实如此。

  1. 这是正确的行为吗?
  2. 我是否应该手动处理使用 Spring/Kafka 默认值的提交?
  3. 我应该使用不同的 ErrorHandler 还是 handle 方法? (我需要访问 Container 才能为延迟重试逻辑创建 pause();不能使用 Thread.sleep()

某种相关问题:https://github.com/spring-projects/spring-kafka/issues/1265

完整代码:https://github.com/ptomaszek/spring-kafka-error-handler

必须重新定位消费者(使用搜索),以便在失败后重新获取记录。

使用 DefaultErrorHandler(2.8.x 及更高版本)或早期版本的 SeekToCurrentErrorHandler

您可以添加重试选项和恢复器来处理失败的记录;默认情况下它只是记录。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current

您需要先进行查找(或在 finally 块中),然后才能抛出任何异常;如果错误处理程序抛出异常,容器不会提交偏移量。

Kafka维护2个偏移量——当前提交的偏移量和当前位置(消费者启动时设置为提交的偏移量)。下一次轮询总是 returns 上次轮询后的下一条记录。除非执行搜索。

默认错误处理程序捕获恢复程序抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。参见 SeekUtils.doSeeks()