max.poll.intervals.ms 默认设置为 int.Max

max.poll.intervals.ms set to int.Max by default

A​​pache Kafka 文档指出:

The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE

由于此值用于检测一批记录的处理时间何时超过给定阈值,是否有这样一个 "unlimited" 值的原因?

它是否使应用程序变得无响应?或者 Kafka Streams 有不同的方式在处理时间过长时离开消费者组?

Does it enable applications to become unresponsive? Or Kafka Streams has a different way to leave the consumer group when the processing is taking too long?

Kafka Streams 在此上下文中利用 Kafka 消费者客户端的心跳功能,从而将心跳(“此应用程序实例是否仍然存在?”)与对 poll() 的调用分离。两个主要参数是 session.timeout.ms(用于心跳线程)和 max.poll.interval.ms(用于处理线程),它们的区别在 .

中有更详细的描述

引入检测信号是为了允许应用程序实例花费大量时间处理记录,而不会被视为“没有取得进展”并因此“死亡”。例如,您的应用程序可以在一分钟内对一条记录进行大量处理,同时仍然向 Kafka 发出心跳声“嘿,我还活着,我 正在 取得进展。但是我只是还没有完成处理。敬请期待。"

当然,您可以将 max.poll.interval.ms 从其默认值 (Integer.MAX_VALUE) 更改为较低的设置,例如,如果您确实希望您的应用程序实例在需要时被视为“已死”轮询记录之间的时间超过 X 秒,因此如果处理最新一轮记录的时间超过 X 秒。这样的配置是否有意义取决于您的具体用例——在大多数情况下,默认设置是一个安全的选择。

session.timeout.ms: The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.