Kafka 中手动偏移量管理的用例

Usecases for manual offset management in Kafka

我正在尝试在 Java 上实施 Kafka 消费者。

假设消费者包含一些可能抛出异常的消息处理逻辑。在那种情况下,消费者应该休眠一段时间并重新处理最后一条消息。

我的想法是使用手动偏移量管理:失败时不会提交偏移量,因此消费者可能会从旧偏移量读取。

在测试过程中,我发现尽管未提交偏移量,但消息实际上只被读取了一次。仅在应用程序重新启动时才考虑上次提交的偏移量。

我的问题是:

KafkaConsumer 将最新的偏移量 保存在内存中 ,因此,如果发生异常(并且您从中恢复)并且您想每秒读取一条消息一次,您需要在第二次轮询之前使用 seek()

提交偏移量 "only" 在那里,以在客户端关闭或崩溃时保留偏移量(即,偏移量可靠地存储与内存中)。在客户端启动时,获取最新提交的偏移量,并且客户端只使用它自己的内存偏移量。

手动偏移管理很有用,如果你想 "bundle" 偏移提交与一些其他操作(例如,另一个系统中的第二个 "commit" 必须与提交的 Kafka 偏移同步) .