Kafka with Java:如何重新读取数据

Kafka with Java: how to re-read data

我在使用 kafka API 时遇到以下问题。我设置我的消费者:

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configuration.batchSize);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后

while(true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
    try {
         //do some update in DB in a transaction
         consumer.commitSync();
    } catch (Exception e) {
    }

我想从 Kafka 读取数据,根据这些数据更新数据库。但是如果更新失败,我想重试直到它有效。所以我想将DB事务应用到kafka,i.a。如果我的数据库事务正常,则移动 kafka 指针,但如果失败,则从相同位置重试。

在我的代码中,

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(

没有按预期工作,这意味着 "if kafka crashes, then restart from the commited position"。但是当我的数据库事务失败时,即使我不 commitAsync() 指针也会向前移动。

我的问题是:有没有简单的方法可以将kafka的指针位置反转,到上次poll的位置。

我已经注意到 API

public void seek(TopicPartition partition,
             long offset);

但这需要手动维护一个分区列表及其偏移量,我想还有更简单、更优雅的东西吗?

1) 由于 consumer.poll 在一个循环中,无论您是否提交偏移量,您都将继续向前移动偏移量。 Commit 仅在您重新启动组件时派上用场。即知道消费者应该从哪里开始消费。

2) 如果DB事务失败时需要移动到之前提交的offset,那么使用Kafka Consumer中的seek方法。 public void seek(TopicPartition分区,长偏移量)

3) 要提交各个分区的偏移量,您需要按照您提到的那样维护每个分区的偏移量。我不认为有任何其他方式。

您可能不需要在每次数据库事务失败时都查找先前提交的偏移量。您可能想要暂停您的消费者并重试几次,以指数方式增加等待时间。

但是要回答关于如何在每次轮询时移动到前一个偏移量的问题,请跟踪每个分区中第一条消息的偏移量,如果失败,在循环结束时,寻找到您跟踪的偏移量。