为什么新消费者组的 kafka 消费者 (0.10.0.0) 确实看到 old/previously 发布的消息?
Why kafka consumer (0.10.0.0) for new consumer group does see old/previously published messages?
我有一个生产者,可以在名为 'mytopic' 的主题上发布消息,这很好。我在 2 个不同的消费者组中有 2 个消费者正在收听这些消息。我按以下顺序启动了这两个消费者和生产者。
1) 在组中启动消费者 1 'group1'
2) 启动生产者发布数百条消息
一段时间后,我检查了消费者 1 的偏移量,正如我所料:
/opt/kafka_2.11-0.10.0.0/bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group group1
输出:
Group Topic Pid Offset logSize Lag Owner
group1 mytopic 0 30230 36942 6712 none
3) 现在我在组 'group2' 中启动消费者 2 以收听相同的消息,但它在每次 poll() 调用时返回 0 条消息。
这个消费者的偏移量检查显示它的偏移量与 logSize 相同。
/opt/kafka_2.11-0.10.0.0/bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group group2
输出:
Group Topic Pid Offset logSize Lag Owner
group2 mytopic 0 36942 36942 0 none
具有新消费者组的任何其他消费者都会遇到同样的问题。为什么消费者在消息发布后加入了一个新的消费者组,却看不到旧消息,即使该主题上存在消息(即没有被删除)?
您需要在消费者配置中将参数设置 auto.offset.reset
更改为值 "earliest"
-- 默认值为 "latest"
告诉新消费者在当前结束时开始消费日志。
我有一个生产者,可以在名为 'mytopic' 的主题上发布消息,这很好。我在 2 个不同的消费者组中有 2 个消费者正在收听这些消息。我按以下顺序启动了这两个消费者和生产者。
1) 在组中启动消费者 1 'group1' 2) 启动生产者发布数百条消息
一段时间后,我检查了消费者 1 的偏移量,正如我所料:
/opt/kafka_2.11-0.10.0.0/bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group group1
输出:
Group Topic Pid Offset logSize Lag Owner
group1 mytopic 0 30230 36942 6712 none
3) 现在我在组 'group2' 中启动消费者 2 以收听相同的消息,但它在每次 poll() 调用时返回 0 条消息。 这个消费者的偏移量检查显示它的偏移量与 logSize 相同。
/opt/kafka_2.11-0.10.0.0/bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group group2
输出:
Group Topic Pid Offset logSize Lag Owner
group2 mytopic 0 36942 36942 0 none
具有新消费者组的任何其他消费者都会遇到同样的问题。为什么消费者在消息发布后加入了一个新的消费者组,却看不到旧消息,即使该主题上存在消息(即没有被删除)?
您需要在消费者配置中将参数设置 auto.offset.reset
更改为值 "earliest"
-- 默认值为 "latest"
告诉新消费者在当前结束时开始消费日志。