librdkafka C API Kafka 消费者没有正确读取所有消息
librdkafka C API Kafka Consumer doesn't read all messages correctly
我正在使用 librdkafka
C API 消费者(特别是使用 rd_kafka_consumer_poll
阅读,我之前确实调用过 rd_kafka_poll_set_consumer
)
我看到的问题是,在我的 google 测试中,我遵循
向kafka写入3条消息
init/start 卡夫卡消费者 (rd_kafka_consumer_poll
)
在rebalance_cb
中我将每个分区偏移设置为RD_KAFKA_OFFSET_STORED
并将它们分配给handle
此时我认为它应该读取 3 条消息,但它只读取最后一条消息,但令人惊讶的是每个分区的偏移量已经更新!
我是否遗漏了一些使用 Kafka 消费者的东西?
还有一个问题是,我最初认为存储的偏移量在 kafka broker 中,并且主题 + 消费者组 ID + 分区组合有唯一的偏移量。
所以我认为阅读同一主题的不同消费者群体应该有不同的偏移量。
然而,看起来并非如此。当使用不同的消费者群体时,我总是从相同的偏移量读取。
我怀疑这可能与偏移提交有关,但不确定在哪里解决这个问题。
有什么见解吗?
要查看的配置:auto.offset.reset
来自 Kakfa consumer documentation :
What to do when there is no initial offset in Kafka or if the current
offset does not exist any more on the server
Action to take when there is no initial offset in offset store or the
desired offset is out of range: 'smallest','earliest' - automatically
reset the offset to the smallest offset, 'largest','latest' -
automatically reset the offset to the largest offset, 'error' -
trigger an error which is retrieved by consuming messages and checking
'message->err'. Type: enum value
默认值为最新。
此外,
#define RD_KAFKA_OFFSET_STORED -1000
所以,您试图将分区偏移量设置为 -1000,这显然不是有效的偏移量。
显然,librdkafka 在这种情况下读取了最后一条消息(我没有检查代码)。
我正在使用 librdkafka
C API 消费者(特别是使用 rd_kafka_consumer_poll
阅读,我之前确实调用过 rd_kafka_poll_set_consumer
)
我看到的问题是,在我的 google 测试中,我遵循
向kafka写入3条消息
init/start 卡夫卡消费者 (
rd_kafka_consumer_poll
)在
rebalance_cb
中我将每个分区偏移设置为RD_KAFKA_OFFSET_STORED
并将它们分配给handle此时我认为它应该读取 3 条消息,但它只读取最后一条消息,但令人惊讶的是每个分区的偏移量已经更新!
我是否遗漏了一些使用 Kafka 消费者的东西?
还有一个问题是,我最初认为存储的偏移量在 kafka broker 中,并且主题 + 消费者组 ID + 分区组合有唯一的偏移量。
所以我认为阅读同一主题的不同消费者群体应该有不同的偏移量。
然而,看起来并非如此。当使用不同的消费者群体时,我总是从相同的偏移量读取。
我怀疑这可能与偏移提交有关,但不确定在哪里解决这个问题。
有什么见解吗?
要查看的配置:auto.offset.reset
来自 Kakfa consumer documentation :
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'. Type: enum value
默认值为最新。
此外,
#define RD_KAFKA_OFFSET_STORED -1000
所以,您试图将分区偏移量设置为 -1000,这显然不是有效的偏移量。 显然,librdkafka 在这种情况下读取了最后一条消息(我没有检查代码)。