为什么consumer重启后会读取所有来自Kafka topic的消息?
Why does consumer read all messages from Kafka topic after restart?
我使用 confluent .net 客户端。
订阅者总是在重启(订阅者服务重启)后从 Kafka 主题中读取所有消息。
如何提交消费者已经实现的偏移量并从中读取?
也许一些消费者配置可以帮助...
您有两个选择:
通过设置消费者属性 EnableAutoCommit = true
启用自动提交(消息在可配置的时间后提交,通常为 5 秒),或
通过consumer.Commit(consumeResult)
手动提交获取的偏移量。
GitHub 上显示了手动提交的示例。
这只是一个大胆的猜测,但是如何声明消费者的组 ID?
我见过一些使用这种随机分配的例子:
["group.id"] = Guid.NewGuid().ToString(),
如果您在每次启动消费者时声明一个 new/random group.id
,这将导致在每次执行时注册一个新的消费者组,这涉及 auto.offset.reset
启动.
如果这个 属性 设置为“earliest
”,那么每次启动消费者时(假设他们每次都有不同的 group.id),他们将从第一个可用的偏移量,就像你的情况一样,从头开始再次阅读所有消息。
如果此 属性 设置为“latest
”,并且您的制作人当前未发送任何消息,您将无法阅读任何内容,这可能会造成一些混乱。
尝试设置一个固定的 group.id
:开始消费,当消息在代理上仍然可用时停止进程,然后再次启动消费者,而不更改最后一个 group.id
。
这次,由于消费者组已经注册,auto.offset.reset
将被忽略,起始位置将由您提交的偏移量定义,默认情况下存储在一个名为 [=18= 的特殊主题上].
我使用 confluent .net 客户端。 订阅者总是在重启(订阅者服务重启)后从 Kafka 主题中读取所有消息。 如何提交消费者已经实现的偏移量并从中读取? 也许一些消费者配置可以帮助...
您有两个选择:
通过设置消费者属性
EnableAutoCommit = true
启用自动提交(消息在可配置的时间后提交,通常为 5 秒),或通过
consumer.Commit(consumeResult)
手动提交获取的偏移量。
GitHub 上显示了手动提交的示例。
这只是一个大胆的猜测,但是如何声明消费者的组 ID? 我见过一些使用这种随机分配的例子:
["group.id"] = Guid.NewGuid().ToString(),
如果您在每次启动消费者时声明一个 new/random group.id
,这将导致在每次执行时注册一个新的消费者组,这涉及 auto.offset.reset
启动.
如果这个 属性 设置为“earliest
”,那么每次启动消费者时(假设他们每次都有不同的 group.id),他们将从第一个可用的偏移量,就像你的情况一样,从头开始再次阅读所有消息。
如果此 属性 设置为“latest
”,并且您的制作人当前未发送任何消息,您将无法阅读任何内容,这可能会造成一些混乱。
尝试设置一个固定的 group.id
:开始消费,当消息在代理上仍然可用时停止进程,然后再次启动消费者,而不更改最后一个 group.id
。
这次,由于消费者组已经注册,auto.offset.reset
将被忽略,起始位置将由您提交的偏移量定义,默认情况下存储在一个名为 [=18= 的特殊主题上].