librdkafka C API Kafka 消费者没有正确读取所有消息

librdkafka C API Kafka Consumer doesn't read all messages correctly

我正在使用 librdkafka C API 消费者(特别是使用 rd_kafka_consumer_poll 阅读,我之前确实调用过 rd_kafka_poll_set_consumer

我看到的问题是,在我的 google 测试中,我遵循

  1. 向kafka写入3条消息

  2. init/start 卡夫卡消费者 (rd_kafka_consumer_poll)

  3. rebalance_cb中我将每个分区偏移设置为RD_KAFKA_OFFSET_STORED并将它们分配给handle

  4. 此时我认为它应该读取 3 条消息,但它只读取最后一条消息,但令人惊讶的是每个分区的偏移量已经更新!

我是否遗漏了一些使用 Kafka 消费者的东西?

还有一个问题是,我最初认为存储的偏移量在 kafka broker 中,并且主题 + 消费者组 ID + 分区组合有唯一的偏移量。

所以我认为阅读同一主题的不同消费者群体应该有不同的偏移量。

然而,看起来并非如此。当使用不同的消费者群体时,我总是从相同的偏移量读取。

我怀疑这可能与偏移提交有关,但不确定在哪里解决这个问题。

有什么见解吗?

要查看的配置:auto.offset.reset

来自 Kakfa consumer documentation :

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server

来自 librdkafka documentation :

Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'. Type: enum value

默认值为最新

此外,

#define RD_KAFKA_OFFSET_STORED -1000

所以,您试图将分区偏移量设置为 -1000,这显然不是有效的偏移量。 显然,librdkafka 在这种情况下读取了最后一条消息(我没有检查代码)。