最大轮询间隔和会话超时 ms |卡夫卡消费者活着

max poll interval and session timeout ms | kafka consumer alive

场景:

处理消息后手动提交偏移量。

session.timeout.ms: 10 秒

max.poll.interval.ms: 5 分钟

处理“poll()”中消耗的消息需要 6 分钟

时间线:

A(0 秒):应用启动 poll(),已使用消息并开始处理(需要 6 分钟)

B(3秒):发送心跳

C(6秒):发送另一个心跳

D(5 分钟):发送了另一个心跳 (5 * 60 % 3 = 0) 但是达到“max.poll.interval.ms”(5 分钟)

在“D”点将消费者:

  1. 发送“LeaveGroup request”来考虑此消费者“死亡”并重新平衡?

  2. 继续每 3 秒发送一次心跳?

如果点“1”是这样,那么

一个。考虑到其分区由于在“D”点重新平衡而发生更改,该消费者在完成 6 分钟的处理后将如何提交偏移量?

b。 “max.poll.interval.ms”是否应该根据预期的处理时间提前设置?

如果点“2”是这种情况,那么我们将永远不知道处理是否真的被阻止了?

谢谢。

从 Kafka 版本 0.10.1.0 开始,消费者心跳在后台线程中发送,这样客户端处理时间可以比会话超时更长,而不会导致消费者被视为死亡。

但是,max.poll.interval.ms 仍然设置了消费者调用 poll 方法的最大允许时间。

在您的情况下,如果处理时间为 6 分钟,则意味着在“d”点您的消费者将被视为已死亡。

您的担忧是正确的,因为消费者将 无法在 6 分钟后提交消息。您的消费者将获得 CommitFailedExcpetion(如 CommitFailedExcpetion.

上的另一个答案所述

总而言之,是的,如果您已经知道您的处理时间将超过 5 分钟的默认时间,则需要增加 max.poll.interval.ms 时间。

另一种选择是通过减少配置 max.poll.records 来限制 poll 期间获取的记录,该配置默认为 500,描述为:“单次调用中返回的最大记录数轮询 ()".