消费者 'group_name' 组永远重新平衡

Consumer 'group_name' group is rebalancing forever

我正在使用 Kafka:2.11-1.0.1。该应用程序包含主题 'X' 的并发 = 5 且分区 = 5.

的消费者

当应用程序重新启动并且在分配分区之前在主题 'X' 上发布消息时, 主题 'X' 的 5 个消费者找到组协调器并向组协调器发送加入组请求。期望得到组协调员的回复,但没有收到回复。

我检查了 Kafka 服务器日志,但找不到相关日志,发现 DEBUG 日志级别。

当我运行描述消费者组命令时,观察结果如下:

  1. 消费者群体正在重新平衡
  2. 有些滞后的老消费者
  3. 具有一些随机名称的新消费者。随着时间的推移,新的消费者数量正在增加。

有关主题 'X' 的新消息已发布,但消费者未收到。

heartbeat 和 session.time.out 设置为默认值。

如果在主题 'X' 及其使用者的分区分配之前发布消息,则会出现此问题。

我的疑问是:为什么重新平衡没有完成,新消费者开始消费新产生的消息?

应用程序在消费者组中有以下消费者

  • 消费者 A 收听 Topic1。 Topic1 有 1 个分区。 max.poll.interval.time.ms=这个消费者 4 小时。
  • 消费者 B 收听主题 2。 Topic2 有 5 个分区。 消费者 B 并发 = 5。 max.poll.interval.time.ms=此消费者 1 小时。

应用程序重启时发生了什么,如果其中一个主题已经发布了消息

  • 当应用重启一个消费者实例(consumerA1)时 创建并订阅 topic1。 ConsumerA1 找到组坐标(GC)并发送加入组请求。
  • ConsumerA1 得到 GC 的响应,变成leader.Till这一步没有其他消费者初始化。
  • ConsumerA1分配分区并向GC发送SyncGroup请求。新的 作业生成发生。这样第一次rebalance就完成了。
  • 关于topic1的消息已经发布,consumerA1获取这条消息 并开始处理。完成此消息的处理 重要的时间。 (假设 2 小时)
  • 现在5个consumer实例一一初始化,全部订阅topic2。这些消费者找到 GC 并发送加入组请求。 但是 GC 不响应它们。
  • 当 consumerA1 向 GC 发送心跳时,GC 响应说重新平衡正在进行,但 consumerA1 没有撤销分区,因为它正在处理消息。
  • 根据Rebalancing协议(Nice article on rebalancing),GC等待所有消费者发送加入组请求。在这种情况下,GC 等待来自 consumerA1 的加入组请求。最长等待时间为 max.poll.interval.time.ms,即在本例中为 4 小时。

根本原因:

Group Coordinator 在应用程序重启后没有等待所有消费者初始化,因此首先发生了不必要的重新平衡,因此 consumerA1 从分区中获取消息并开始处理它。

解法: 为了避免这种不必要的初始重新平衡,kafka 提供了一种配置,其中组协调器等待消费者加入新的消费者组。 Documentation

group.initial.rebalance.delay.ms

检查了我的 kafka server.properties ,它被设置为 0。 尝试使用默认值,即 3 秒。 避免了初始重新平衡,GC 在应用程序重启时等待 3 秒,此时所有其他消费者 initialized.All 发送加入组请求,因为所有 GC 都收到来自所有消费者的请求。 GC 立即响应,重新平衡已成功完成。