为什么consumer重启后会读取所有来自Kafka topic的消息?

Why does consumer read all messages from Kafka topic after restart?

我使用 confluent .net 客户端。 订阅者总是在重启(订阅者服务重启)后从 Kafka 主题中读取所有消息。 如何提交消费者已经实现的偏移量并从中读取? 也许一些消费者配置可以帮助...

您有两个选择:

  • 通过设置消费者属性 EnableAutoCommit = true 启用自动提交(消息在可配置的时间后提交,通常为 5 秒),或

  • 通过consumer.Commit(consumeResult)手动提交获取的偏移量。

GitHub 上显示了手动提交的示例。

这只是一个大胆的猜测,但是如何声明消费者的组 ID? 我见过一些使用这种随机分配的例子:

     ["group.id"] = Guid.NewGuid().ToString(),

如果您在每次启动消费者时声明一个 new/random group.id,这将导致在每次执行时注册一个新的消费者组,这涉及 auto.offset.reset 启动.

如果这个 属性 设置为“earliest”,那么每次启动消费者时(假设他们每次都有不同的 group.id),他们将从第一个可用的偏移量,就像你的情况一样,从头开始再次阅读所有消息。

如果此 属性 设置为“latest”,并且您的制作人当前未发送任何消息,您将无法阅读任何内容,这可能会造成一些混乱。

尝试设置一个固定的 group.id:开始消费,当消息在代理上仍然可用时停止进程,然后再次启动消费者,而不更改最后一个 group.id

这次,由于消费者组已经注册,auto.offset.reset 将被忽略,起始位置将由您提交的偏移量定义,默认情况下存储在一个名为 [=18= 的特殊主题上].