Kafka如何在消息处理成功之前不移动偏移量

Kafka How to not move offset until message processing is succeeded

我们正在读取来自 Kafka 主题的消息。我的 (false) 印象是,通过设置 EnableAutoOffsetStore/enable.auto.offset.store = false,消费者可以选择何时移动偏移量。

我们是这样使用的

while (!cancellationToken.IsCancellationRequested)
{
    try{       
         var consumeResult = kafkaConsumer.Consume(cancellationToken);             
         // process consumeResult.Message
         kafkaConsumer.StoreOffset(consumeResult);
    }
    catch
    {
      // delay and re-try
    } 
}

这样我们可以在遇到异常时重新读取消息。 我们有 1 个生产者、1 个消费者、1 个主题、1 个分区(和 1 个组)

应该如何实现这种行为?

如果您在 process 中捕获异常,则提交永远不会发生。

您可以存储失败记录的分区+偏移量,然后Seek返回到该偏移量。

或者,您可以跳过主处理循环中的偏移量并将该记录写入 dead-letter 主题并将不同的消费者写入“处理 again/differently”