Kafka >= 0.10.1 的 session.timeout.ms 和 max.poll.interval.ms 之间的差异

Difference between session.timeout.ms and max.poll.interval.ms for Kafka >= 0.10.1

我不清楚为什么我们需要 session.timeout.msmax.poll.interval.ms 以及我们什么时候使用其中之一或两者?似乎这两个设置都指示了协调器在假设它已经死之前等待从消费者那里获取心跳的时间上限。

此外,基于 KIP-62 的版本 0.10.1.0+ 表现如何?

在KIP-62之前,只有session.timeout.ms(即Kafka 0.10.0及更早版本)。 max.poll.interval.ms 通过 KIP-62 引入(Kafka 0.10.1 的一部分)。

KIP-62,通过后台心跳线程将心跳与对 poll() 的调用分离,允许比心跳间隔更长的处理时间(即两个连续 poll() 之间的时间)。

假设处理一条消息需要 1 分钟。如果心跳和轮询是耦合的(即在 KIP-62 之前),您需要将 session.timeout.ms 设置为大于 1 分钟,以防止消费者超时。但是,如果消费者死亡,检测失败的消费者也需要超过 1 分钟的时间。

KIP-62 将轮询和心跳解耦,允许在两个连续的轮询之间发送心跳。现在你有两个线程运行,心跳线程和处理线程,因此,KIP-62 为每个线程引入了超时。 session.timeout.ms 用于心跳线程,而 max.poll.interval.ms 用于处理线程。

假设,您设置了session.timeout.ms=30000,因此,消费者心跳线程必须在该时间到期之前向代理发送心跳。另一方面,如果处理单个消息需要 1 分钟,则可以将 max.poll.interval.ms 设置为大于一分钟,以便为处理线程提供更多时间来处理消息。

如果处理线程终止,需要 max.poll.interval.ms 才能检测到。但是,如果整个消费者死亡(并且一个死亡的处理线程很可能使整个消费者崩溃,包括心跳线程),检测它只需要 session.timeout.ms

这个想法是,即使处理本身需要很长时间,也可以快速检测到失败的消费者。

实施细节

新超时max.poll.interval.ms主要是客户端概念:如果poll()max.poll.interval.ms内没有被调用,心跳线程会检测到这种情况并发送离开组请求给经纪人。 -- max.poll.interval.ms 仍然与消费者组重新平衡相关:如果触发重新平衡,消费者有 max.poll.interval.ms 时间通过调用触发加入组的 poll() 客户端重新加入该组要求。