如果消息未提交,Kafka Consumer 如何自动轮询
Kafka Consumer how to poll automatically if message is not committed
我有一个 Kafka 消费者,其中使用 HTTP POST 调用将消息传递到另一个应用程序。我还使用
手动提交偏移量
acknowledgment.acknowledge();
有一些 HTTP return 错误代码,我们忽略错误并提交偏移量,还有一些错误代码,我们不提交偏移量。问题是只有当我重新启动消费者时,kafka 消费者才会轮询未提交的消息。如果分区中有未提交的消息,无论如何我可以在哪里轮询消息?
有关解释,请参阅 。
要重新传送记录,您必须结合 SeekToCurrentErrorHandler
抛出异常,这会重新定位未处理的分区,以便在下一次轮询时再次获取它们。
或者您可以nack()
确认以获得类似的结果。
我有一个 Kafka 消费者,其中使用 HTTP POST 调用将消息传递到另一个应用程序。我还使用
手动提交偏移量acknowledgment.acknowledge();
有一些 HTTP return 错误代码,我们忽略错误并提交偏移量,还有一些错误代码,我们不提交偏移量。问题是只有当我重新启动消费者时,kafka 消费者才会轮询未提交的消息。如果分区中有未提交的消息,无论如何我可以在哪里轮询消息?
有关解释,请参阅
要重新传送记录,您必须结合 SeekToCurrentErrorHandler
抛出异常,这会重新定位未处理的分区,以便在下一次轮询时再次获取它们。
或者您可以nack()
确认以获得类似的结果。