如果消息未提交,Kafka Consumer 如何自动轮询

Kafka Consumer how to poll automatically if message is not committed

我有一个 Kafka 消费者,其中使用 HTTP POST 调用将消息传递到另一个应用程序。我还使用

手动提交偏移量

acknowledgment.acknowledge();

有一些 HTTP return 错误代码,我们忽略错误并提交偏移量,还有一些错误代码,我们不提交偏移量。问题是只有当我重新启动消费者时,kafka 消费者才会轮询未提交的消息。如果分区中有未提交的消息,无论如何我可以在哪里轮询消息?

有关解释,请参阅

要重新传送记录,您必须结合 SeekToCurrentErrorHandler 抛出异常,这会重新定位未处理的分区,以便在下一次轮询时再次获取它们。

或者您可以nack()确认以获得类似的结果。