处理kafka消息需要很长时间

Processing kafka messages taking long time

我有一个 Python 进程(或者更确切地说,一组进程 运行 在消费者组中并行)根据来自特定主题的 Kafka 消息输入来处理数据。通常每条消息都会被快速处理,但有时,根据消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker 断开客户端与组的连接并启动重新平衡。我可以将 session_timeout_ms 设置为一个非常大的值,但它会超过 10 分钟,这意味着如果客户端死亡,集群将在 10 分钟内无法正确重新平衡。这似乎是个坏主意。此外,大多数消息(大约 98%)都很快,因此只为 1-2% 的消息支付这样的惩罚似乎是一种浪费。 OTOH,大消息足够频繁导致大量重新平衡并消耗大量性能(因为当组重新平衡时,什么都没有完成,然后 "dead" 客户端再次重新加入并导致另一次重新平衡) .

所以,我想知道,有没有其他方法可以处理需要很长时间才能处理的消息?有没有办法手动发起心跳告诉broker"it's ok, I am alive, I'm just working on the message"?我认为 Python 客户端(我使用 kafka-python 1.4.7)应该为我做这件事,但它似乎没有发生。此外, API 似乎根本没有单独的 "heartbeat" 功能。据我所知,调用 poll() 实际上会让我收到下一条消息 - 虽然我什至没有完成当前消息,并且还会弄乱 Kafka 消费者的迭代器 API,这非常方便在 Python 中使用。

以防万一,Kafka集群是Confluent,如果我没记错的话是2.3版。

在Kafka中,0.10.1+ Kafka轮询和会话心跳是相互解耦的。 你可以得到解释

max.poll.interval.ms 超时前允许消费者实例完成处理的时间意味着如果处理时间超过 max.poll.interval.ms 消费者组将假定的时间它从消费者组中删除并调用重新平衡。

增加此值将增加预期轮询之间的间隔,从而使消费者有更多时间处理从轮询(长)返回的一批记录。 但与此同时,它也会延迟组重新平衡,因为消费者只会在调用轮询时加入重新平衡。

session.timeout.ms 是用于识别消费者是否还活着并在定义的时间间隔 (heartbeat.interval.ms) 发送心跳的超时时间。一般来说,经验法则是 heartbeat.interval.ms 应该是会话超时的 1/3,因此在网络故障的情况下,消费者在会话超时之前最多可以错过 3 次心跳。

  1. session.timeout.ms:低值有助于更快地检测到故障。

  2. max.poll.interval.ms:较大的值会降低因处理时间增加而导致失败的风险,但会增加重新平衡时间。

注意:Consumer Group消耗的大量partition和topic也会影响整体rebalance时间

另一种方法,如果您真的想摆脱重新平衡,您可以使用分区分配在每个消费者实例上手动分配分区。在这种情况下,每个消费者实例都将 运行 独立地分配给自己的分区。但在那种情况下,您将无法利用重新平衡功能自动分配分区。