Kafka 从提交失败异常中恢复

Kafka Recover from Commit Failed Exception

我有一个问题,我的提交失败,因为 poll() 太长(我不知道为什么会发生这种情况,没有消息,它只是 read/committing 在一个空队列中,而我的轮询间隔设置为小时)。然后当它再次点击 read() 时,由于某种原因它不会重新平衡。但是,这仅在我的代码在 bluemix 上为 运行 时才会发生,当我在本地重现异常时,下一个 read() 会导致重新平衡。

从 CommitFailedException 中恢复的正确方法是什么?我应该关闭()并重新创建我的消费者吗?还是调用 read() 应该重新平衡并让我继续?

@kyl 所以我相信使用默认的 kafka-java 客户端,消费者每 3 秒就会心跳一次,会话超时为 10 秒,所以你的消费者 应该 留下来在组内而不被取出并发生重新平衡。您的 CommitFailedException 中包含的消息是什么?我假设提交失败是因为您已被踢出。

其他几个问题:

  1. 您是否有多个消费者来来去去,and/or您是否有意使用消费者组而不是单个消费者?

  2. "my poll interval is set to hours"是什么意思?

  3. "committing on an empty queue" 是什么意思?

您能否分享您的消费者循环代码片段,因为这可能有助于更好地解释您在做什么

commitSync 方法将无限期地自动重试,因此如果您得到 CommitFailedException,那么它不是可重试的条件,再次调用 commit 不太可能有帮助。您收到此异常是因为您的消费者已被踢出消费者组。

如果您使用 commitAsync 提交偏移量,则重试 不会 自动进行,您可能会收到 RetriableCommitFailedException 指示潜在的瞬态错误您可以再次手动重试提交。听起来这不是您的情况,但为了完整回答,我将其包括在内。

一旦你的消费者被踢出组并且你得到这个 CommitFailedException 异常你可以继续调用 poll() 直到重新平衡完成并且你被允许回到消费者组(可能有比以前新的一组分区)并且它将继续。

如果您的应用程序不能容忍您正在接收的分区(以及密钥)在流中更改的情况,那么您应该实现一个重新平衡侦听器,它将在分区分配更改时调用。参见 http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

如果您只是想解决偏移量每 24 小时过期的事实,那么除了定期调用 poll() 以保持偏移量之外,您还需要每天至少调用一次提交以保持偏移量最新消费者群体