C++ 卡夫卡客户端 (rdkafka)

C++ kafka client (rdkafka)

我正在使用来自 C++ rdkafka 。 问题是关于 RdKafka::KafkaConsumer。如何设置消费者从主题的开头开始?

P.S。 link 中的消费者示例基于 RdKafka::Consumer,由 "legacy only, use KafkaConsumer instead"

标记

提前致谢

这不是 "auto.offset.reset" 的工作方式。 "auto.offset.reset" 只有在没有有效的提交偏移量时才有效。流程如下:

  1. 启动消费者(重启或崩溃后)
  2. 寻找偏移量
    • 如果找到,从偏移处恢复
    • 如果没有找到,则根据auto.offset.reset设置偏移量。

如果您想在每次重启时阅读整个主题,实际上根本没有理由提交偏移量。提交偏移量的目的是知道你离开的地方,因为你想在重启后从这个偏移量恢复。

我用这个浪费了几个小时,答案是,通常情况下,RTFM :) 是的,你需要设置 属性 [=12= 的值]到smallest,但关键问题是——在哪里?

来自this link

The high-level Kafka consumer (KafkaConsumer in C++) will start consuming at the last committed offset by default, if there is no previously committed offset for the topic+partition and group it will fall back on the topic configuration property auto.offset.reset which defaults to latest, thus starting to consume at the end of the partition (only new messages will be consumed).

注意粗体字,我做错的是调用这个:

rd_kafka_conf_set(_conf_handle, key, val, _errstr, sizeof(_errstr));

而不是这个:

rd_kafka_topic_conf_set(_topic_conf_handle, key, val, _errstr, sizeof(_errstr));