如何处理 (Apache Beam) 高 IO 瓶颈?
How to deal with (Apache Beam) high IO bottlenecks?
让我们举一个简单的例子,我有一个非常简单的光束管道,它只是从一个文件中读取数据并将数据转储到一个输出文件中。现在让我们考虑输入文件很大(大小为几 GB,这种文件类型通常无法在文本编辑器中打开)。由于 direct-runner 实现非常简单(它将整个输入集读入内存),它无法读取和输出那些巨大的文件(除非你为 java 分配了不切实际的大量内存虚拟机进程);所以我的问题是:“像 flink/spark/cloud 数据流这样的生产运行者如何处理这个 'huge dataset problem'? - 假设他们不会只是尝试将整个文件/数据集放入内存中?” -.
我希望生产运行器的实现需要“分部分或分批”工作(如 reading/processing/outputting 分部分),以避免在任何特定时间点尝试将庞大的数据集放入内存。有人可以分享他们对生产运行者如何处理这种“巨大数据”情况的反馈吗?
一般化,请注意这也适用于其他 input/output 机制,例如,如果我的输入是来自巨大数据库 table 的 PCollection(广义上说行大小和数量都很大) , 生产运行器的内部实现是否以某种方式将给定的输入 SQL 语句分成许多内部生成的子语句,每个子语句都采用较小的子集(例如,通过内部生成一个 count(-) 语句,然后是 N 个语句,每个语句都采用(count(-)/N) 个元素?直接运行者不会这样做,只会将给定的查询 1:1 传递给数据库),或者我作为开发人员的责任是“批量迭代”和划分问题,如果确实如此,这里的最佳实践是什么,即:为这个或多个设置一个管道?如果只有一个,则以某种方式将管道参数化为 read/write 分批?或者迭代一个简单的管道并在管道外部管理必要的元数据?
在此先致谢,如有任何反馈,我们将不胜感激!
编辑(反映大卫的反馈):
大卫,您的反馈非常有价值,绝对触及了我感兴趣的要点。有一个用于拆分源的工作发现阶段和阅读阶段以同时读取拆分分区绝对是我有兴趣听到的, 所以谢谢你给我指明了正确的方向。如果你不介意的话,我有几个小的跟进问题:
1 - 文章在“通用枚举器-reader 通信机制”部分指出以下内容:
"The SplitEnumerator and SourceReader are both user implemented class.
It is not rare that the implementation require some communication
between these two components. In order to facilitate such use cases [....]"
所以我的问题是,某些用户(即开发人员)提供的实现(特别是 SplitEnumerator 和 SourceReader)触发的“拆分 + 阅读行为”,或者我可以从中受益吗?自定义代码?
2 - 可能只是更深入地研究了上面的问题;如果我有 batch/bounded 工作负载(假设我使用的是 apache flink),并且我有兴趣处理原始 post 中描述的“大文件”,管道是否会“结束”盒子”(在幕后进行“工作准备阶段”拆分和并行读取),还是需要开发人员实施一些自定义代码?
提前感谢您提供所有宝贵的反馈!
请注意,当输入有界且事先已知时(即批处理工作负载而不是流式处理),这会更直接。
在 Flink 中,在设计时考虑了流式处理,这是通过将“工作发现”与“阅读”分开来完成的。单个 SplitEnumerator
运行一次并枚举要读取的块(splits/partitions),并将它们分配给并行读取器。在批处理情况下,拆分由一系列偏移量定义,而在流式处理情况下,每个拆分的结束偏移量设置为 LONG_MAX.
这在 FLIP-27: Refactor Source Interface 中有更详细的描述。
只是为了解决这个问题,这个问题的理由是要知道 apache beam - 当与生产运行器结合使用时 -(如 flink 或 spark 或 google 云数据流)是否提供了用于拆分工作的盒子机制 a.k.a reading/writing 操纵 - 巨大的文件(或一般的数据源)。上面 David Anderson 提供的评论在暗示 Apache flink 如何处理此工作流方面具有重要价值。
在这一点上,我已经实现了使用大文件(用于测试可能的 IO 瓶颈)和基于“beam on flink”的管道的解决方案,我可以确认,flink 将创建一个执行计划,其中包括拆分源,并以不会出现内存问题的方式划分工作。现在,当然可能存在稳定性/“IO 性能”受到损害的情况,但至少我可以确认在管道抽象背后执行的工作流,在执行任务时使用文件系统,避免将所有数据放入内存和从而避免琐碎的内存错误。结论:是的,“beam on flink”(可能还有 spark 和数据流)确实提供了适当的工作准备、工作拆分和文件系统使用,以便以有效的方式使用可用的易失性内存。
关于数据源的更新: 将 DB 作为数据源,Flink 不会(也不能 - 这不是微不足道的)optimize/split/distribute 与 DB 数据源相关的工作与优化从文件系统读取的方式相同。虽然仍然有从数据库中读取大量数据(记录)的方法,但实现细节需要由开发人员解决,而不是框架的责任。我发现这篇文章 (https://nl.devoteam.com/expert-view/querying-jdbc-database-in-parallel-with-google-dataflow-apache-beam/) 非常有助于解决从 Beam 中的数据库读取大量记录的问题(这篇文章使用了云数据流运行器,但我使用了 Flink,它工作得很好),拆分查询并分配处理。
让我们举一个简单的例子,我有一个非常简单的光束管道,它只是从一个文件中读取数据并将数据转储到一个输出文件中。现在让我们考虑输入文件很大(大小为几 GB,这种文件类型通常无法在文本编辑器中打开)。由于 direct-runner 实现非常简单(它将整个输入集读入内存),它无法读取和输出那些巨大的文件(除非你为 java 分配了不切实际的大量内存虚拟机进程);所以我的问题是:“像 flink/spark/cloud 数据流这样的生产运行者如何处理这个 'huge dataset problem'? - 假设他们不会只是尝试将整个文件/数据集放入内存中?” -.
我希望生产运行器的实现需要“分部分或分批”工作(如 reading/processing/outputting 分部分),以避免在任何特定时间点尝试将庞大的数据集放入内存。有人可以分享他们对生产运行者如何处理这种“巨大数据”情况的反馈吗?
一般化,请注意这也适用于其他 input/output 机制,例如,如果我的输入是来自巨大数据库 table 的 PCollection(广义上说行大小和数量都很大) , 生产运行器的内部实现是否以某种方式将给定的输入 SQL 语句分成许多内部生成的子语句,每个子语句都采用较小的子集(例如,通过内部生成一个 count(-) 语句,然后是 N 个语句,每个语句都采用(count(-)/N) 个元素?直接运行者不会这样做,只会将给定的查询 1:1 传递给数据库),或者我作为开发人员的责任是“批量迭代”和划分问题,如果确实如此,这里的最佳实践是什么,即:为这个或多个设置一个管道?如果只有一个,则以某种方式将管道参数化为 read/write 分批?或者迭代一个简单的管道并在管道外部管理必要的元数据?
在此先致谢,如有任何反馈,我们将不胜感激!
编辑(反映大卫的反馈):
大卫,您的反馈非常有价值,绝对触及了我感兴趣的要点。有一个用于拆分源的工作发现阶段和阅读阶段以同时读取拆分分区绝对是我有兴趣听到的, 所以谢谢你给我指明了正确的方向。如果你不介意的话,我有几个小的跟进问题:
1 - 文章在“通用枚举器-reader 通信机制”部分指出以下内容:
"The SplitEnumerator and SourceReader are both user implemented class. It is not rare that the implementation require some communication between these two components. In order to facilitate such use cases [....]"
所以我的问题是,某些用户(即开发人员)提供的实现(特别是 SplitEnumerator 和 SourceReader)触发的“拆分 + 阅读行为”,或者我可以从中受益吗?自定义代码?
2 - 可能只是更深入地研究了上面的问题;如果我有 batch/bounded 工作负载(假设我使用的是 apache flink),并且我有兴趣处理原始 post 中描述的“大文件”,管道是否会“结束”盒子”(在幕后进行“工作准备阶段”拆分和并行读取),还是需要开发人员实施一些自定义代码?
提前感谢您提供所有宝贵的反馈!
请注意,当输入有界且事先已知时(即批处理工作负载而不是流式处理),这会更直接。
在 Flink 中,在设计时考虑了流式处理,这是通过将“工作发现”与“阅读”分开来完成的。单个 SplitEnumerator
运行一次并枚举要读取的块(splits/partitions),并将它们分配给并行读取器。在批处理情况下,拆分由一系列偏移量定义,而在流式处理情况下,每个拆分的结束偏移量设置为 LONG_MAX.
这在 FLIP-27: Refactor Source Interface 中有更详细的描述。
只是为了解决这个问题,这个问题的理由是要知道 apache beam - 当与生产运行器结合使用时 -(如 flink 或 spark 或 google 云数据流)是否提供了用于拆分工作的盒子机制 a.k.a reading/writing 操纵 - 巨大的文件(或一般的数据源)。上面 David Anderson 提供的评论在暗示 Apache flink 如何处理此工作流方面具有重要价值。
在这一点上,我已经实现了使用大文件(用于测试可能的 IO 瓶颈)和基于“beam on flink”的管道的解决方案,我可以确认,flink 将创建一个执行计划,其中包括拆分源,并以不会出现内存问题的方式划分工作。现在,当然可能存在稳定性/“IO 性能”受到损害的情况,但至少我可以确认在管道抽象背后执行的工作流,在执行任务时使用文件系统,避免将所有数据放入内存和从而避免琐碎的内存错误。结论:是的,“beam on flink”(可能还有 spark 和数据流)确实提供了适当的工作准备、工作拆分和文件系统使用,以便以有效的方式使用可用的易失性内存。
关于数据源的更新: 将 DB 作为数据源,Flink 不会(也不能 - 这不是微不足道的)optimize/split/distribute 与 DB 数据源相关的工作与优化从文件系统读取的方式相同。虽然仍然有从数据库中读取大量数据(记录)的方法,但实现细节需要由开发人员解决,而不是框架的责任。我发现这篇文章 (https://nl.devoteam.com/expert-view/querying-jdbc-database-in-parallel-with-google-dataflow-apache-beam/) 非常有助于解决从 Beam 中的数据库读取大量记录的问题(这篇文章使用了云数据流运行器,但我使用了 Flink,它工作得很好),拆分查询并分配处理。