快速处理主题和慢速处理主题 - Akka Kafka
Fast Processing Topic and Slow Processing Topic - Akka Kafka
我有一个问题,我需要优先处理一些事件,以便更早处理一些事件,比如在高优先级事件之后。这些事件来自一个来源,我需要根据事件类型优先级对流进行优先级排序,以便在高优先级或低优先级接收器中转发。我正在使用 kafka 和 akka kafka 流。所以主要问题是我在给定的时间点获得大量流量。这里的首选方案是什么?
首先要解决的是偏移量提交。因为处理不会按顺序进行,处理后commit offsets不能保证at-least-once(也不能保证at-most-once),因为下面的顺序是可能的(而且这个概率不能降为零):
- 在处理多个低优先级消息之前处理的高优先级消息的提交偏移量
- 流失败(或实例 运行 流停止,或其他)
- 流从上次提交的偏移量重新开始
- 低优先级的消息再也不会从 Kafka 中读取,因此永远不会被处理
这表明要么偏移量提交必须在重新排序之前发生,要么我们需要一个已处理但尚未提交的概念,直到处理完低优先级的消息。请注意,对于后一种选择,跟踪未提交的最大偏移量(可能有效的最简单策略)将不起作用,如果有任何可能在偏移序列中产生间隙这意味着无限保留和不压缩,我实际上建议提交处理前的偏移量,但是一旦处理逻辑保证它最终会处理消息。
Actor 和 Akka Persistence 的结合允许采用这种方法。粗略的概述是拥有一个持久的参与者(这非常适合事件溯源)并且基本上维护要处理的高优先级和低优先级消息列表。流从 Kafka 向 actor 发送带有消息的“询问”,actor 在收到消息时将消息分类为 high-/low-priority,假设该消息尚未被处理。消息(可能还有它的分类)作为事件持久化,参与者确认收到消息,并承诺通过为自己安排消息来完全处理“待处理”消息来处理它。确认完成请求,允许将偏移量提交给 Kafka。在收到消息(实际上是命令)以处理消息时,参与者选择要处理的 Kafka 消息(按优先级、年龄等)并坚持认为它已处理该消息(因此将其从“待处理”中移出到“已处理”),并且可能还会保留与其解释 Kafka 消息的方式相关的事件更新状态。在这种持久化之后,actor 向自己发送另一个命令来处理“to-process”消息。
然后通过后台进程使用“处理一条到进程的消息”命令定期 ping 该参与者来实现容错。
与流一样,这是一个每个分区一个逻辑线程的过程。您可能正在为每个物理 Kafka 分区复用多个分区的状态,在这种情况下,您可以拥有多个这样的参与者并从摄取流发送多个请求。如果这样做,周期性 ping 可能最好通过 Akka 持久性查询提供的流来完成,以获取所有持久性参与者的标识符。
请注意,此问题中的重新排序从根本上使它成为一个竞争,因此是不确定的:在此设计草图中,竞争是因为对于来自参与者 B 的消息 M1 和来自参与者 C 发送给参与者 A 的消息 M2 可能会收到以任何顺序(如果参与者 B 在发送消息 M1 之后向参与者 A 发送消息 M3,则 M3 将在 M1 之后到达,但可以在 M2 之前或之后到达)。在不同的设计中,竞争可能基于相对于 Kafka 使消息可供消费的延迟的处理速度。
我有一个问题,我需要优先处理一些事件,以便更早处理一些事件,比如在高优先级事件之后。这些事件来自一个来源,我需要根据事件类型优先级对流进行优先级排序,以便在高优先级或低优先级接收器中转发。我正在使用 kafka 和 akka kafka 流。所以主要问题是我在给定的时间点获得大量流量。这里的首选方案是什么?
首先要解决的是偏移量提交。因为处理不会按顺序进行,处理后commit offsets不能保证at-least-once(也不能保证at-most-once),因为下面的顺序是可能的(而且这个概率不能降为零):
- 在处理多个低优先级消息之前处理的高优先级消息的提交偏移量
- 流失败(或实例 运行 流停止,或其他)
- 流从上次提交的偏移量重新开始
- 低优先级的消息再也不会从 Kafka 中读取,因此永远不会被处理
这表明要么偏移量提交必须在重新排序之前发生,要么我们需要一个已处理但尚未提交的概念,直到处理完低优先级的消息。请注意,对于后一种选择,跟踪未提交的最大偏移量(可能有效的最简单策略)将不起作用,如果有任何可能在偏移序列中产生间隙这意味着无限保留和不压缩,我实际上建议提交处理前的偏移量,但是一旦处理逻辑保证它最终会处理消息。
Actor 和 Akka Persistence 的结合允许采用这种方法。粗略的概述是拥有一个持久的参与者(这非常适合事件溯源)并且基本上维护要处理的高优先级和低优先级消息列表。流从 Kafka 向 actor 发送带有消息的“询问”,actor 在收到消息时将消息分类为 high-/low-priority,假设该消息尚未被处理。消息(可能还有它的分类)作为事件持久化,参与者确认收到消息,并承诺通过为自己安排消息来完全处理“待处理”消息来处理它。确认完成请求,允许将偏移量提交给 Kafka。在收到消息(实际上是命令)以处理消息时,参与者选择要处理的 Kafka 消息(按优先级、年龄等)并坚持认为它已处理该消息(因此将其从“待处理”中移出到“已处理”),并且可能还会保留与其解释 Kafka 消息的方式相关的事件更新状态。在这种持久化之后,actor 向自己发送另一个命令来处理“to-process”消息。
然后通过后台进程使用“处理一条到进程的消息”命令定期 ping 该参与者来实现容错。
与流一样,这是一个每个分区一个逻辑线程的过程。您可能正在为每个物理 Kafka 分区复用多个分区的状态,在这种情况下,您可以拥有多个这样的参与者并从摄取流发送多个请求。如果这样做,周期性 ping 可能最好通过 Akka 持久性查询提供的流来完成,以获取所有持久性参与者的标识符。
请注意,此问题中的重新排序从根本上使它成为一个竞争,因此是不确定的:在此设计草图中,竞争是因为对于来自参与者 B 的消息 M1 和来自参与者 C 发送给参与者 A 的消息 M2 可能会收到以任何顺序(如果参与者 B 在发送消息 M1 之后向参与者 A 发送消息 M3,则 M3 将在 M1 之后到达,但可以在 M2 之前或之后到达)。在不同的设计中,竞争可能基于相对于 Kafka 使消息可供消费的延迟的处理速度。