使用 Spark 将大量文件写入 s3 的最佳实践是什么

What is the best practice writing massive amount of files to s3 using Spark

我正在尝试使用 Spark 将大约 30k-60k 的 parquet 文件写入 s3,由于 s3 速率限制,这需要花费大量时间(40 多分钟)。 我想知道是否有最佳实践来做这样的事情。我听说将数据写入 HDFS,然后使用 s3-dist-cp 复制它可能会更快。我不明白为什么。由于 s3 速率限制,来自 HDFS 的副本不会花费相同的时间吗?

感谢您的帮助

这种方法没有任何问题,并且在大多数用例中都可以正常工作,但由于 S3 文件的编写方式,可能会有一些挑战。

需要理解的两个重要概念

  1. S3(对象存储)!= POSIX 文件系统:重命名操作:

    基于 POSIX 的文件系统中的文件重命名过程只是元数据 operation.Only 指针发生变化,文件在磁盘上保持原样。例如,我有一个文件 abc.txt,我想将其重命名为 xyz.txt,它是即时的和原子的。 xyz.txt 的最后修改时间戳与 abc.txt 的最后修改时间戳保持相同。 与在 AWS S3(对象存储)中一样,引擎盖下的文件重命名是一个副本,然后是删除操作。源文件首先被复制到目标文件,然后源文件是 deleted.So “aws s3 mv” 更改目标文件的最后修改时间戳不同于 POSIX 文件 system.The 这里的元数据是一个键值store 其中 key 是文件路径,value 是文件的内容,没有更改密钥这样的过程并立即完成。重命名过程取决于文件的大小。如果有目录重命名(为简单起见,S3 中没有所谓的目录,我们可以假设一组递归文件作为目录),那么它取决于目录中文件的数量以及每个文件的大小。所以简而言之,与普通文件系统相比,重命名在 S3 中是非常昂贵的操作。

  2. S3 一致性模型

    S3 自带 2 种一致性 a.read 写入后 b.eventual 一致性,在某些情况下会导致文件未找到 expectation.Files 添加且未列出或文件是否被删除从列表中删除。

Deep explanation:

Spark 利用 Hadoop 的“FileOutputCommitter”实现来写入数据。再次写入数据涉及多个步骤,在高级别暂存输出文件上,然后提交它们,即写入最终 files.Here 涉及重命名步骤,因为我之前谈到从暂存到最终 step.As 你知道一个 spark 作业分为多个阶段和任务集,由于分布式计算的性质,任务容易失败,因此还提供了由于系统故障或推测执行缓慢 运行ning 任务而重新启动相同任务的规定,并且这导致了任务提交和作业提交的概念 functions.Here 我们有两种现成的算法选项,以及作业和任务提交的完成方式,并且说过这不是一种算法比其他算法更好,而是基于我们提交数据的位置.

mapreduce.fileoutputcommitter.algorithm.version=1

  • commitTask将task生成的数据从task临时目录重命名为job临时目录

  • 当所有任务完成后,commitJob 将作业临时目录中的所有数据重命名为最终目标,并在最后创建 _SUCCESS 文件。

这里的驱动程序在最后完成 commitJob 的工作,因此像 S3 这样的对象存储可能需要更长的时间,因为很多任务临时文件正在排队等待重命名操作(虽然它不是串行的)并且写入性能不是 optimized.It 对于 HDFS 可能工作得很好,因为重命名并不昂贵,而且只是元数据 change.For AWS S3 在 commitJob 期间,文件的每个重命名操作都会打开对 AWS S3 的大量 API 调用,并可能导致如果文件数量很多,意外 API 调用关闭的问题。也未必。我在两个不同的时间 运行ning 看到了同一份工作的两个案例。

mapreduce.fileoutputcommitter.algorithm.version=2

  • commitTask 在任务完成后立即将任务生成的数据从任务临时目录直接移动到最终目的地。

  • commitJob 基本上写_SUCCESS 文件,并没有做太多。

从高层次上看,这看起来是优化的,但它有一个限制,即不能执行推测任务,而且如果任何任务由于数据损坏而失败,那么我们可能会在最终目的地留下残留数据,需要一个清理。因此,该算法不会提供 100% 的数据正确性,或者不适用于我们需要将数据以附加模式添加到现有 files.Even 的用例,如果这确保优化结果带有 risk.The 良好性能的理由基本上是因为与算法 1 相比,重命名操作的数量较少(仍然有重命名)。在这里我们可能会遇到文件未找到预期的问题,因为 commitTask 将文件写入临时路径并立即重命名它们,并且最终一致性问题的可能性很小。

最佳实践

以下是我认为我们在编写 spark 数据处理应用程序时可以使用的几个:

  • 如果您有可用的 HDFS 集群,则将数据从 Spark 写入 HDFS 并将其复制到 S3 以持久保存。 s3-dist-cp 可用于从 HDFS 到 S3 的数据复制 optimally.Here 我们可以避免所有重命名 operation.With AWS EMR 仅在计算期间 运行ning 然后终止到持久化结果这种方法看起来更可取。

  • 尽量避免写入文件并一次又一次地读取它,除非有文件的消费者,spark 以内存处理和谨慎数据着称 persistence/cache 内存将帮助优化应用程序的运行时间。