可能快照机制在 Apache Flink 中占用越来越多的内存
May snapshot mechanism spend more and more memory in Apache Flink
我正在学习 Flink 中快照机制的工作原理。
据我了解,JobManager 会以固定的间隔将屏障插入到每个数据源中,并且每个运算符都会在从其所有数据源接收到第 n 个屏障后进行快照。
如果我是对的,似乎这种机制在某些情况下可能会使用越来越多的内存。
这是一个例子:
表示有两个数据源:Source 1
和 Source 2
,以及一个运算符。
Source 1 -----\
------ Operator
Source 2 -----/
Source 1
正在生成整数流:1, 2, 3, 4, 5...
Source 2
正在生成字符流:a, b, c, d, e...
Operator 这样做:它需要来自 Source 1
的两个输入和来自 Source 2
的一个输入来生成输出:1a2, 3b4, 5c6, 7d8...
假设 JobManager 将屏障插入到两个数据源中,如下所示:
1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
现在让我们开始吧。
当Source 1
和Source 2
的两个"BARRIER A"进入Operator时,Flink会为Operator做一个快照,Operator的当前状态为1
并且a
,因为当BARRIER A进入Operator时,1
和a
已经在Operator中。
然后,当两个"BARRIER B"进入Operator时,Operator完成了第一个任务:生成1a2
,Flink会再做一次快照:NA
,b
。 NA
表示当前没有来自 Source 1
的新输入。
同时,每个快照都会存储到RAM、FS或RocksDB中(取决于我们如何配置Flink)。
如果我没看错的话,我觉得Flink在这个例子中会生成越来越多的快照。因为Source 1
的消费速度永远是Source 2
的两倍。
我是不是误会了什么?
有趣的思想实验。
如果您限制自己仅使用 Flink 的标准部分 API,则无法实现用户函数,该函数将从源 2 读取每个输入并从源 1 读取两个输入。实现时a CoProcessFunction
,例如,你受 Flink 运行时的支配,它会根据自己的内部逻辑从任一流提供事件。这两个流将相互竞争,可能 运行 在不同的线程甚至不同的进程中。当流汇聚时,如果来自两个输入的事件没有按照您希望的顺序提供,则必须将它们缓冲在 Flink 状态,直到您准备好处理它们。
这可能导致大量缓冲需求的常见情况是在实现事件时间连接时,其中一个流在时间戳方面远远领先于其他流(例如,加入外汇汇率的金融交易,如果汇率流滞后,则使用交易时有效的汇率)。但是这个缓冲可以在RocksDB中完成,而且不必对内存造成压力。
请注意,这种状态缓冲完全发生在您的应用程序中——Flink 没有灵活的网络缓冲区,可以在背压期间膨胀。
另一点是快照永远不会存储在本地文件系统或 RocksDB 中。如果您选择使用 RocksDB 状态后端,那么每个任务管理器的活动状态将存储在本地 RocksDB 实例中,但状态备份(快照)将存储在分布式文件系统中。
对于你描述的这种情况,
1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
这不会发生。没有什么可以安排这两个源以这种方式同步——它们将比该图显示的更独立地进行。由于 Flink 在管道阶段之间只有少量固定数量的网络缓冲,因此执行图中发生的任何背压都会迅速传播回一个或两个源。当这种情况发生时,背压源将无法将任何事件推送到管道中,直到背压减轻——但与此同时,另一个源可能会继续取得进展。 barrier 将由两个源大致同时独立地插入到两个流中,但是如果源 2 经常遇到背压(例如),它可能看起来更像这样:
1, BARRIER, A, 2, B, 3, BARRIER, C, 4, D, BARRIER, 5 ...
a, BARRIER, A, BARRIER, b, B, BARRIER, BARRIER, c ...
我正在学习 Flink 中快照机制的工作原理。
据我了解,JobManager 会以固定的间隔将屏障插入到每个数据源中,并且每个运算符都会在从其所有数据源接收到第 n 个屏障后进行快照。
如果我是对的,似乎这种机制在某些情况下可能会使用越来越多的内存。
这是一个例子:
表示有两个数据源:Source 1
和 Source 2
,以及一个运算符。
Source 1 -----\
------ Operator
Source 2 -----/
Source 1
正在生成整数流:1, 2, 3, 4, 5...
Source 2
正在生成字符流:a, b, c, d, e...
Operator 这样做:它需要来自 Source 1
的两个输入和来自 Source 2
的一个输入来生成输出:1a2, 3b4, 5c6, 7d8...
假设 JobManager 将屏障插入到两个数据源中,如下所示:
1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
现在让我们开始吧。
当Source 1
和Source 2
的两个"BARRIER A"进入Operator时,Flink会为Operator做一个快照,Operator的当前状态为1
并且a
,因为当BARRIER A进入Operator时,1
和a
已经在Operator中。
然后,当两个"BARRIER B"进入Operator时,Operator完成了第一个任务:生成1a2
,Flink会再做一次快照:NA
,b
。 NA
表示当前没有来自 Source 1
的新输入。
同时,每个快照都会存储到RAM、FS或RocksDB中(取决于我们如何配置Flink)。
如果我没看错的话,我觉得Flink在这个例子中会生成越来越多的快照。因为Source 1
的消费速度永远是Source 2
的两倍。
我是不是误会了什么?
有趣的思想实验。
如果您限制自己仅使用 Flink 的标准部分 API,则无法实现用户函数,该函数将从源 2 读取每个输入并从源 1 读取两个输入。实现时a CoProcessFunction
,例如,你受 Flink 运行时的支配,它会根据自己的内部逻辑从任一流提供事件。这两个流将相互竞争,可能 运行 在不同的线程甚至不同的进程中。当流汇聚时,如果来自两个输入的事件没有按照您希望的顺序提供,则必须将它们缓冲在 Flink 状态,直到您准备好处理它们。
这可能导致大量缓冲需求的常见情况是在实现事件时间连接时,其中一个流在时间戳方面远远领先于其他流(例如,加入外汇汇率的金融交易,如果汇率流滞后,则使用交易时有效的汇率)。但是这个缓冲可以在RocksDB中完成,而且不必对内存造成压力。
请注意,这种状态缓冲完全发生在您的应用程序中——Flink 没有灵活的网络缓冲区,可以在背压期间膨胀。
另一点是快照永远不会存储在本地文件系统或 RocksDB 中。如果您选择使用 RocksDB 状态后端,那么每个任务管理器的活动状态将存储在本地 RocksDB 实例中,但状态备份(快照)将存储在分布式文件系统中。
对于你描述的这种情况,
1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
这不会发生。没有什么可以安排这两个源以这种方式同步——它们将比该图显示的更独立地进行。由于 Flink 在管道阶段之间只有少量固定数量的网络缓冲,因此执行图中发生的任何背压都会迅速传播回一个或两个源。当这种情况发生时,背压源将无法将任何事件推送到管道中,直到背压减轻——但与此同时,另一个源可能会继续取得进展。 barrier 将由两个源大致同时独立地插入到两个流中,但是如果源 2 经常遇到背压(例如),它可能看起来更像这样:
1, BARRIER, A, 2, B, 3, BARRIER, C, 4, D, BARRIER, 5 ...
a, BARRIER, A, BARRIER, b, B, BARRIER, BARRIER, c ...