如何阻止高负载导致级联的 Flink 检查点故障

How to stop high load from leading to cascading Flink checkpoint failures

我会主动提出几点:

  1. 我是 Flink 的新手(使用它大约一个月了)
  2. 我正在使用 Kinesis Analytics(AWS 托管的 Flink 解决方案)。从各方面来看,这并没有真正限制 Flink 的多功能性或容错选项,但无论如何我都会说出来。

我们有一个相当直接的滑动 window 应用程序。键控流按特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。每 30 秒我们计算 window 中每个键的事件,并将该值保存到外部数据存储。状态也会更新以反映 window 中的事件,以便旧事件过期并且不会占用内存。

有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都是完美的。当一个 IP 在 24 小时内登录 20 万次时,事情就开始变得棘手了。此时,检查点开始花费的时间越来越长。一个平均检查点需要 2-3 秒,但是对于这种用户行为,检查点开始需要 5 分钟,然后是 10 分钟,然后是 15 分钟,然后是 30 分钟,然后是 40 分钟,等等。

应用程序可以运行在这种情况下顺利运行一段时间,令人惊讶。可能需要 10 或 12 个小时。但是,检查点迟早会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有处理新事件等。

此时我已经尝试了一些方法:

  1. 在问题上投入更多的精力(自动缩放也打开)
  2. 纠结于 CheckpointingInterval 和 MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
  3. 重构以减少我们存储的状态的足迹

(1) 并没有真正做太多。 (2) 这似乎有所帮助,但随后又出现了比我们之前看到的更大的流量峰值,这削弱了任何好处 (3) 目前还不清楚这是否有帮助。我认为与你想象的 Yelp 或 Airbnb 相比,我们的应用程序内存占用相当小,他们都使用 Flink 集群来处理大量应用程序,所以我无法想象我的状态真的有问题。

我会说我希望我们不必深入改变应用程序输出的期望。这个滑动 window 是一个非常有价值的数据。

编辑:有人问我的状态是什么 ValueState[FooState]

case class FooState(
                         entityType: String,
                         entityID: String,
                         events: List[BarStateEvent],
                         tableName: String,
                         baseFeatureName: String,
                       )

case class BarStateEvent(target: Double, eventID: String, timestamp: Long)

编辑: 我想强调用户 David Anderson 在评论中所说的话:

One approach sometimes used for implementing sliding windows is to use MapState, where the keys are the timestamps for the slices, and the values are lists of events.

这是必不可少的。对于任何其他试图走这条路的人,我找不到一个可行的解决方案,它不会将事件存储到某个时间片中。我的最终解决方案涉及将事件分桶成 30 秒的批次,然后按照 David 的建议将它们写入地图状态。这似乎可以解决问题。对于我们的高负载时期,检查点保持在 3mb,并且它们总是在一秒钟内完成。

如果您有一个 24 小时长的滑动 window,并且滑动 30 秒,那么每次登录都会分配给 2880 个单独的 windows。没错,Flink 的滑动windows make copys。在本例中为 24 * 60 * 2 份。

如果您只是简单地计算登录事件,那么在 windows 关闭之前不需要实际缓冲登录事件。您可以改用 ReduceFunction to perform incremental aggregation.

我的猜测是您没有利用此优化,因此当您有一个热键(IP 地址)时,处理该热键的实例具有不成比例的数据量,并且需要很长时间到检查站了。

另一方面,如果您已经在进行增量聚合,并且检查点与您描述的一样有问题,那么值得更深入地研究以尝试理解原因。

一种可能的补救措施是使用 ProcessFunction 实现您自己的滑动 windows。通过这样做,您可以避免单独维护 2880 windows,并使用更高效的数据结构。

编辑(基于更新的问题):

我认为问题是这样的:当使用 RocksDB 状态后端时,状态以序列化字节的形式存在。每次状态访问和更新都必须经过 ser/de。这意味着你的 List[BarStateEvent] 正在被反序列化,然后在你每次修改它时重新序列化。对于列表中有 200k 个事件的 IP 地址,这将非常昂贵。

您应该改为使用 ListStateMapState。这些状态类型针对 RocksDB 进行了优化。 RocksDB 状态后端可以附加到 ListState 而无需反序列化列表。使用 MapState,映射中的每个 key/value 对都是一个单独的 RocksDB 对象,允许高效查找和修改。

有时用于实现滑动 windows 的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。在 Flink docs.

中有一个类似的例子(但有翻滚 windows)

或者,如果您的状态适合内存,您可以使用 FsStateBackend。然后你的所有状态都将是 JVM 堆上的对象,并且 ser/de 只会在检查点和恢复期间发挥作用。