Kafka enable.auto.commit 设置为 false 但轮询仍获取 "next" 消息

Kafka enable.auto.commit set to false but poll still fetch "next" messages

我想在我的消费者成功处理记录时告诉 Kafka,因此我通过将 enable.auto.commit 设置为 false 来关闭自动提交。我有两条关于我订阅的主题的消息,偏移量为零和一个,并创建了一个消费者,以便每次调用 poll 最多 return 一个 记录(通过将 max.poll.records 设置为 1)。

我现在打电话给 consumer.poll(5000) 并收到第一条消息,但我没有确认;我不叫 commitSynccommitAsync。如果我现在再次调用 consumer.poll(5000),使用相同的消费者,我希望得到与我刚刚阅读的完全相同的消息,但相反,我收到了第二条消息。

如何让 consumer.poll 继续发出相同的消息,直到我明确确认为止?

您所描述的是预期的行为。每次您调用 poll(),它都会 return 下一条消息。您提交的偏移量仅在连接新消费者时使用,因此它知道从哪里(重新)开始。

在 MessageHub 中,我们已将 session.timeout 设置为 30 秒。所以你需要稍微快点调用poll()来避免掉线。如果您的处理时间比这更长,那么我可以想到 2 个选项:

  • 使用 Kafka 0.10.2 并设置 max.poll.interval.ms 以告诉您的 Kafka 客户端在处理上一条记录时保持会话活动(无需调用 poll()) . (此功能是在 0.10.1 中添加的,但我们不支持该版本。0.10.2 可以使用,因为它能够与 0.10.0 代理一起使用)

  • 使用 seek() 返回到 poll 之后的前一个偏移量,因此它保持 returning 相同的记录。

希望对您有所帮助!