Apache Flink 检查点卡住了
Apache Flink checkpointing stuck
我们运行正在处理一个 ListState 介于 300GB 和 400GB 之间的作业,有时列表可以增长到几千个。在我们的用例中,每个项目都必须有自己的 TTL,因此我们为这个 ListState 的每个新项目创建一个新的计时器,在 S3 上有一个 RocksDB 后端。
目前大约有 140 多个计时器(将在 event.timestamp + 40 天 时触发)。
我们的问题是作业的检查点突然卡住,或者非常慢(比如几个小时后 1%),直到最终超时。它通常在一段非常简单的代码上停止(flink 仪表板显示 0/12 (0%)
,而前几行显示 12/12 (100%)
):
[...]
val myStream = env.addSource(someKafkaConsumer)
.rebalance
.map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
.uid("src_kafka_stream")
.name("some_name")
myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
.getSideOutput(outputTag)
.keyBy(_.name)
.addSink(sink)
[...]
更多信息:
- AT_LEAST_ONCE 检查点模式似乎比 EXACTLY_ONCE
更容易卡住
- 几个月前,状态上升到 1.5TB 的数据,我认为数十亿个计时器没有任何问题。
- 机器上的
- RAM、CPU 和网络 运行 两个任务管理器看起来都很正常
state.backend.rocksdb.thread.num = 4
- 第一个事件发生在我们收到大量事件(大约几分钟内)的时候,但上一个事件没有。
- 所有事件均来自 Kafka 主题。
- 在 AT_LEAST_ONCE 检查点模式下,作业仍然 运行 并且正常消耗。
这是我们第二次遇到这种情况,拓扑 运行 非常好,每天有几百万个事件,突然停止检查点。我们不知道是什么原因造成的。
谁能想到什么会突然导致检查点卡住?
一些想法:
如果您有许多定时器都或多或少同时触发,这种定时器风暴将阻止任何其他事情发生——任务将循环调用 onTimer 直到没有更多定时器被触发,在此期间他们的输入队列将被忽略,检查点障碍将不会进行。
如果这是您遇到问题的原因,您可以为您的计时器添加一些随机抖动,这样事件风暴以后就不会变成计时器风暴。重新组织要使用的东西 State TTL 可能是另一种选择。
如果堆上有很多计时器,这会导致非常高的 GC 开销。这不一定会使工作失败,但会使检查点不稳定。在这种情况下,将计时器移到 RocksDB 中可能会有所帮助。
此外:由于您使用的是 RocksDB,从 ListState 切换到 MapState,以时间为关键,可以让您删除单个条目,而不必在每次更新后重新序列化整个列表。 (对于 RocksDB,MapState 中的每个 key/value 对都是一个单独的 RocksDB 对象。)以这种方式提高清理效率可能是最好的补救措施。
我们运行正在处理一个 ListState 介于 300GB 和 400GB 之间的作业,有时列表可以增长到几千个。在我们的用例中,每个项目都必须有自己的 TTL,因此我们为这个 ListState 的每个新项目创建一个新的计时器,在 S3 上有一个 RocksDB 后端。
目前大约有 140 多个计时器(将在 event.timestamp + 40 天 时触发)。
我们的问题是作业的检查点突然卡住,或者非常慢(比如几个小时后 1%),直到最终超时。它通常在一段非常简单的代码上停止(flink 仪表板显示 0/12 (0%)
,而前几行显示 12/12 (100%)
):
[...]
val myStream = env.addSource(someKafkaConsumer)
.rebalance
.map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
.uid("src_kafka_stream")
.name("some_name")
myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
.getSideOutput(outputTag)
.keyBy(_.name)
.addSink(sink)
[...]
更多信息:
- AT_LEAST_ONCE 检查点模式似乎比 EXACTLY_ONCE 更容易卡住
- 几个月前,状态上升到 1.5TB 的数据,我认为数十亿个计时器没有任何问题。
- 机器上的
- RAM、CPU 和网络 运行 两个任务管理器看起来都很正常
state.backend.rocksdb.thread.num = 4
- 第一个事件发生在我们收到大量事件(大约几分钟内)的时候,但上一个事件没有。
- 所有事件均来自 Kafka 主题。
- 在 AT_LEAST_ONCE 检查点模式下,作业仍然 运行 并且正常消耗。
这是我们第二次遇到这种情况,拓扑 运行 非常好,每天有几百万个事件,突然停止检查点。我们不知道是什么原因造成的。
谁能想到什么会突然导致检查点卡住?
一些想法:
如果您有许多定时器都或多或少同时触发,这种定时器风暴将阻止任何其他事情发生——任务将循环调用 onTimer 直到没有更多定时器被触发,在此期间他们的输入队列将被忽略,检查点障碍将不会进行。
如果这是您遇到问题的原因,您可以为您的计时器添加一些随机抖动,这样事件风暴以后就不会变成计时器风暴。重新组织要使用的东西 State TTL 可能是另一种选择。
如果堆上有很多计时器,这会导致非常高的 GC 开销。这不一定会使工作失败,但会使检查点不稳定。在这种情况下,将计时器移到 RocksDB 中可能会有所帮助。
此外:由于您使用的是 RocksDB,从 ListState 切换到 MapState,以时间为关键,可以让您删除单个条目,而不必在每次更新后重新序列化整个列表。 (对于 RocksDB,MapState 中的每个 key/value 对都是一个单独的 RocksDB 对象。)以这种方式提高清理效率可能是最好的补救措施。