Kafka如何在消息处理成功之前不移动偏移量
Kafka How to not move offset until message processing is succeeded
我们正在读取来自 Kafka 主题的消息。我的 (false) 印象是,通过设置 EnableAutoOffsetStore/enable.auto.offset.store = false,消费者可以选择何时移动偏移量。
我们是这样使用的
while (!cancellationToken.IsCancellationRequested)
{
try{
var consumeResult = kafkaConsumer.Consume(cancellationToken);
// process consumeResult.Message
kafkaConsumer.StoreOffset(consumeResult);
}
catch
{
// delay and re-try
}
}
这样我们可以在遇到异常时重新读取消息。
我们有 1 个生产者、1 个消费者、1 个主题、1 个分区(和 1 个组)
应该如何实现这种行为?
如果您在 process
中捕获异常,则提交永远不会发生。
您可以存储失败记录的分区+偏移量,然后Seek
返回到该偏移量。
或者,您可以跳过主处理循环中的偏移量并将该记录写入 dead-letter 主题并将不同的消费者写入“处理 again/differently”
我们正在读取来自 Kafka 主题的消息。我的 (false) 印象是,通过设置 EnableAutoOffsetStore/enable.auto.offset.store = false,消费者可以选择何时移动偏移量。
我们是这样使用的
while (!cancellationToken.IsCancellationRequested)
{
try{
var consumeResult = kafkaConsumer.Consume(cancellationToken);
// process consumeResult.Message
kafkaConsumer.StoreOffset(consumeResult);
}
catch
{
// delay and re-try
}
}
这样我们可以在遇到异常时重新读取消息。 我们有 1 个生产者、1 个消费者、1 个主题、1 个分区(和 1 个组)
应该如何实现这种行为?
如果您在 process
中捕获异常,则提交永远不会发生。
您可以存储失败记录的分区+偏移量,然后Seek
返回到该偏移量。
或者,您可以跳过主处理循环中的偏移量并将该记录写入 dead-letter 主题并将不同的消费者写入“处理 again/differently”