Kafka消费者设计用于处理多实例的海量数据

Kafka consumer design to process huge volume of data with multi instance

我正在尝试设计 Kafka 消费者,但我在如何设计流程方面遇到了障碍。我在考虑两个选择:

1.  Process records directly from Kafka.
2.  Staging table write from Kafka and process records.

方法 1: 处理来自 Kafka 的关键消息:

•   Read messages one at a time from Kafka & if no records to process break the loop (configurable messages to process)
•   Execute business rules.
•   Apply changes to consumer database.
•   Update Kafka offset to read after processing message.
•   Insert into staging table (used for PD guide later on)

上述方法的问题:

•   Is it OK to subscribe to a partition and keep the lock open on Kafka partition until configurable messages are processed
    and then apply business rules, apply changes to database. All happens in the same process, any performance issues doing this way ?
•   Is it OK to manually commit the offset to Kafka? (Performance issues with manual offset commit).

方法 2: 暂存 table 从 Kafka 写入并处理记录

Process 1: Consuming events from Kafka and put in staging table.
Process 2: Reading staging table (configurable rows), execute business rules, apply consumer database changes 
& update the status of processed records in staging table. (we may have multiple process to do this step)

我发现这种方法有很多缺点:

•   We are missing the advantage of offset handling provided by Kafka and we are doing manual update of processed records in staging table.
•   Locking & Blocking on staging tables for multi instance, as we are trying to insert & do updates after processing in the same staging table 
    (note: I can design separate tables and move this data there and process them but that could is introducing multiple processes again.

我如何设计具有多实例消费者和要处理的海量数据的 Kafka,哪种设计合适,从 Kafka 读取数据并处理消息或将其暂存到 table并编写另一个作业来处理这些消息?

使用数据并相应地更新 table 的第一种方法听起来是正确的方法。

Kafka 保证

至少一次:您可能会收到两次相同的消息。
这意味着您需要消息是幂等的 -> 将金额设置为 x,而不是将金额添加到先前的值。

顺序(每个分区):Kafka 承诺您使用消息的顺序与生成消息的顺序相同 - 每个分区。就像每个分区一个队列。
如果当你说“执行业务规则”时你还需要读取以前的写入,这意味着你需要一个一个地处理它们。

如何定义分区

如果您定义一个分区,您将不会遇到冲突问题,但您将只有一个消费者并且无法扩展。
如果您任意定义多个分区,那么您可能会丢失顺序。
为什么这是个问题?
您需要根据您的业务模型定义分区: 例如,假设每条消息都会更新某个用户的数据库。当你处理一条消息时,你想读取用户行,检查一些字段,然后根据该字段更新(或不更新)。
这意味着如果您通过用户 ID 定义分区 -> (用户 ID % 分区数)
您保证不会在同一用户的两次更新之间出现竞争条件,并且您可以扩展到多个 machines/processes/threads。每个消费者负责一组用户,但始终是相同的用户。

消费者的设计取决于您的用例。 如果有其他下游进程需要相同的数据并且有连接到您的 kafka 集群的限制。在这种情况下,分期 table 是个好主意。

我认为在您的情况下,稍作改动的方法 1 是一个不错的方法。 但是,如果主题中没有新消息,则无需中断循环。 此外,还有一个消费者 属性 可帮助配置您希望在单个请求中从 kafka 轮询的记录数(默认为 500),如果每条消息都需要很长时间,您可能希望将其更改为较小的数量处理(以避免超时或不需要的重新分区问题)。

既然你提到数据量很大,如果处理顺序对你来说不重要,我建议你有更多的分区来实现并发。通过创建实例数不超过主题分区数的消费者组,可以实现并发。 (如果消费者实例数大于分区数,额外的实例将是理想的)

如果顺序很重要,生产者最好发送具有相同消息密钥的逻辑分组消息,以便所有具有相同密钥的消息都位于同一分区中。

关于偏移量提交,如果你同步提交每条消息到kafka你肯定会对性能产生影响。通常在偏移量中为每个消耗的记录批次提交。例如轮询 500 条记录-> 处理-> 提交这批记录。 但是,如果您需要为每条消息发送提交,您可能需要选择异步提交。

此外,当分区被分配给消费者组实例时,它不会锁定分区。其他消费组可以订阅同一个主题,并发消费消息。

这就是我认为我们可以在不担心消息丢失的情况下获得最佳吞吐量的方式-

  1. 最大化分区数。
  2. 部署消费者(最多分区数,如果您的消费者可以毫无问题地运行多线程,则更少。)
  3. 从每个消费者中单线程读取(使用自动偏移提交)并将消息放入阻塞队列中,您可以根据每个消费者中实际处理线程的数量来控制该队列。
  4. 如果处理失败,您可以重试成功或将消息放入死信队列。不要忘记执行关闭连接以处理已使用的消息。
  5. 如果你想确保排序,比如一个接一个地处理具有相同键的事件,或者来自单个分区的任何其他因素,你可以使用确定性执行器。我在 Java 中编写了一个基本的 ExecutorService,它可以以确定的方式执行多条消息,而不会影响逻辑上独立事件的多线程。 Link- https://github.com/mukulbansal93/deterministic-threading

回答您的问题-

  1. Is it ok to subscribe to a partition and keep the lock open on Kafka partition until configurable messages are processed and then apply business rules, apply changes to database. All happens in the same process, any performance issues doing this way? 由于您正在批量处理,因此我在这里没有发现太多性能问题。但是,有可能您使用的一条消息花费了很长时间,而其他消息却在进行处理。在这种情况下,您将无法读取来自 Kafka 的其他消息,从而导致性能瓶颈。
  2. Is it ok to manually commit the offset to Kafka? (Performance issues with manual offset commit). 这肯定是吞吐量最低的方法,因为偏移量提交是一项昂贵的操作。