为什么 Kafka 消费者需要很长时间才能开始消费?

Why does a Kafka consumer take a long time to start consuming?

我们启动一个 Kafka 消费者,监听一个可能尚未创建的主题(虽然启用了主题自动创建)。

此后不久,制作人就该主题发布了消息。

但是,消费者需要一些时间才能注意到:准确地说是 5 分钟。此时消费者撤销其分区并重新加入消费者组。卡夫卡重新稳定了这个群体。查看消费者与kafka日志的时间戳,这个过程是在消费者端发起的。

我想这是预期的行为,但我想了解这一点。这实际上是在进行重新平衡(从 0 到 1 分区)吗?如果我们预先创建主题,这不会发生吗?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2017-02-01 08:41:45.540  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning

kafka 日志

[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-partitioning with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-partitioning generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,551] INFO [GroupCoordinator 1001]: Assignment received from leader for group tps-kafka-partitioning for generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-group-id with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-group-id generation 2 (kafka.coordinator.GroupCoordinator)

这可能是因为参数 metadata.max.age.ms 的默认值控制了消费者强制刷新主题元数据的频率。

当你用一个不存在的主题启动消费者时会发生什么是代理自动创建这个主题,但这需要一点时间来进行领导选举等,所以当你的消费者请求该主题的元数据时收到 LEADER_NOT_AVAILABLE 警告并且无法获取任何消息。 在达到上述超时后,消费者刷新元数据,这次成功并开始读取消息。这不依赖于生产者向主题写入消息,它纯粹是消费者的事。

如果您以例如 1000 毫秒的超时启动您的消费者,您应该会看到更短的延迟,直到消息被消费。

此外,如果您预先创建主题,或者在消费者之前启动生产者,则根本不会发生这种行为。