在处理当前消息之前不要阅读下一条 Kafka 消息?
Don't read next Kafka message until current message is processed?
当使用 Confluent Kafka C# 库消费消息时,我只想在处理完当前消息后看到一条新消息(如果出现问题,我需要再次读取同一消息)。换句话说,在我明确告诉它改变之前,我不想改变偏移量。
为了尝试实现这一点,我在配置中停用了自动提交(就像在示例中一样):
{ "enable.auto.commit", false }
{ "auto.offset.reset", "smallest" }
然后我注释掉提交行:
while(true)
{
if (!consumer.Consume(out Message<string, string> msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
//I thought by removing this line, I would keep getting the same message (until I've processed the message and commited the offset)
//consumer.CommitAsync(msg).Result;
}
我希望通过不提交,我会在调用 Consume() 时不断收到相同的消息,但事实并非如此。即使我不提交,偏移量也会不断变化,每次消费我都会收到新消息。
请澄清我明显的误解?
调用CommitAsync
将您的偏移量保存到kafka存储的偏移量主题。因此,如果您要处理您的 Consumer 并创建一个新的,您将从上次提交的偏移量重新开始,或者如果您不提交偏移量,您将从偏移量 0 开始。
如果不提交偏移量但继续在您的应用中使用相同的消费者,那么偏移量仍将递增并由消费者保存在内存中。
虽然这是 Java 消费者 class 在 偏移量和消费者位置 部分下的文档,但它更详细地说明了偏移量和提交的功能. KafkaConsumer Docs
当使用 Confluent Kafka C# 库消费消息时,我只想在处理完当前消息后看到一条新消息(如果出现问题,我需要再次读取同一消息)。换句话说,在我明确告诉它改变之前,我不想改变偏移量。
为了尝试实现这一点,我在配置中停用了自动提交(就像在示例中一样):
{ "enable.auto.commit", false }
{ "auto.offset.reset", "smallest" }
然后我注释掉提交行:
while(true)
{
if (!consumer.Consume(out Message<string, string> msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
//I thought by removing this line, I would keep getting the same message (until I've processed the message and commited the offset)
//consumer.CommitAsync(msg).Result;
}
我希望通过不提交,我会在调用 Consume() 时不断收到相同的消息,但事实并非如此。即使我不提交,偏移量也会不断变化,每次消费我都会收到新消息。
请澄清我明显的误解?
调用CommitAsync
将您的偏移量保存到kafka存储的偏移量主题。因此,如果您要处理您的 Consumer 并创建一个新的,您将从上次提交的偏移量重新开始,或者如果您不提交偏移量,您将从偏移量 0 开始。
如果不提交偏移量但继续在您的应用中使用相同的消费者,那么偏移量仍将递增并由消费者保存在内存中。
虽然这是 Java 消费者 class 在 偏移量和消费者位置 部分下的文档,但它更详细地说明了偏移量和提交的功能. KafkaConsumer Docs