Spark SQL 数据存储生命周期

Spark SQL data storage life cycle

,我正在读取一个有数十亿条记录的配置单元table,由于磁盘利用率高导致作业失败,但是在添加AWS EBS卷后,作业运行 没有任何问题。虽然它解决了问题,但我几乎没有怀疑,我尝试做一些研究但找不到任何明确的答案。所以我的问题是?

当一个 spark SQL 读取一个 hive table 时,最初存储数据以供处理的地方以及数据的整个生命周期在存储方面是什么,如果我没有明确指定什么?以及如何添加 EBS 卷解决问题?

最初数据位于 HDFS/S3/etc 的 table 位置。如果数据不适合内存,Spark 会将数据溢出到本地存储。

阅读Apache Spark FAQ

Does my data need to fit in memory to use Spark?

No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.

每当 spark 从 hive 表中读取数据时,它都会将其存储在 RDD 中。有一点我想在这里澄清的是 hive 只是一个仓库所以它就像是 HDFS 之上的一层,当 spark 与 hive 交互时,hive 提供 spark hdfs loaction 存在的位置。

因此,Spark 从 HDFS 读取文件,它为单个输入拆分创建单个分区。输入拆分由 Hadoop 设置(无论用于读取此文件的 InputFormat 是什么。例如:如果您使用 textFile() 它将是 Hadoop 中的 TextInputFormat,这将 return 您为单个 HDFS 块的单个分区(注意:分区之间的分割将在行分割上完成,而不是精确的块分割),除非你有像 Avro/parquet.

这样的压缩文件格式

如果您手动添加 rdd.repartition(x) 它将执行从您在 rdd 中的 N 个分区到您想要的 x 个分区的数据洗牌,分区将在循环法的基础上完成。

如果您有一个 10GB 的未压缩文本文件存储在 HDFS 上,那么使用默认的 HDFS 块大小设置 (256MB) 它将存储在 40 个块中,这意味着您从该文件读取的 RDD 将有 40 个分区。当您调用 repartition(1000) 时,您的 RDD 将被标记为重新分区,但实际上只有当您在此 RDD 之上执行操作时,它才会被洗牌到 1000 个分区(惰性执行概念)

现在一切都取决于 spark 将如何处理数据,因为 Spark 正在执行延迟评估,在进行处理之前,spark 准备一个 DAG 以进行最佳处理。还有一点 spark 需要配置驱动程序内存、内核数、执行程序数等,如果配置不合适,作业将失败。

准备好 DAG 后,它就会开始处理数据。所以它把你的工作分成阶段,阶段又分成任务。每个任务将进一步使用特定的执行器、洗牌、分区。因此,在您处理数十亿条记录的情况下,您的配置可能不足以进行处理。还有一点,当我们说 spark 加载 RDD/Dataframe 中的数据时,它由 spark 管理,可以选择仅将数据保留在 memory/disk/memory 等 ref -storage_spark.

简而言之,

Hive-->HDFS--->SPARK>>RDD(存储依赖于它的惰性评估)。

您可以参考以下内容 link :

Spark 会读取数据,如果内存放不下,就会溢出到磁盘上。

注意几点:

  • 内存中的数据被压缩了,据我所知,你获得了大约 20%(例如,一个 100MB 的文件将只占用 80MB 的内存)。
  • 一旦您 read() 就会开始摄取,它不是 DAG 的一部分,您可以在 SQL 查询本身中限制摄取量。读操作由执行者完成。这个例子应该给你一个提示:https://github.com/jgperrin/net.jgp.books.spark.ch08/blob/master/src/main/java/net/jgp/books/spark/ch08/lab300_advanced_queries/MySQLWithWhereClauseToDatasetApp.java
  • 在最新版本的 Spark 中,您可以下推过滤器(例如,如果您在摄取后立即进行过滤,Spark 将知道并优化摄取),我认为这仅适用于 CSV、Avro 和 Parquet。对于数据库(包括 Hive),我推荐前面的示例。
  • 存储必须来自执行程序 seen/accessible,因此如果您有 EBS 卷,请确保它们来自集群 seen/accessible,其中 executors/workers 是 运行,与驱动程序所在的节点 运行.