Spark partition(ing) 如何处理 HDFS 中的文件?

How does Spark partition(ing) work on files in HDFS?

我正在使用 HDFS 的集群上使用 Apache Spark。据我了解,HDFS 正在数据节点上分发文件。因此,如果将 "file.txt" 放在文件系统上,它将被分成多个分区。 现在我打电话给

rdd = SparkContext().textFile("hdfs://.../file.txt") 

来自 Apache Spark。 rdd 现在是否自动与文件系统上的 "file.txt" 相同的分区? 当我调用

时会发生什么
rdd.repartition(x)

where x > 然后是 hdfs 使用的分区? Spark 会在物理上重新排列 hdfs 上的数据以在本地工作吗?

示例: 我在 HDFS 系统上放置了一个 30GB 的文本文件,该系统将其分布在 10 个节点上。 将火花 a) 使用相同的 10 个分区?和 b) 当我调用 repartition(1000)?

时,在集群中洗牌 30GB

当 Spark 从 HDFS 读取文件时,它会为单个输入拆分创建单个分区。输入拆分由用于读取此文件的 Hadoop InputFormat 设置。例如,如果您使用 textFile(),在 Hadoop 中将是 TextInputFormat,这将 return 为单个 HDFS 块创建一个分区(但分区之间的拆分将在线完成拆分,而不是确切的块拆分),除非你有一个压缩的文本文件。在压缩文件的情况下,您将获得单个文件的单个分区(因为压缩文本文件不可拆分)。

当您调用 rdd.repartition(x) 时,它将执行从 rdd 中的 N 个分区到您想要的 x 个分区的数据随机播放,分区会以循环方式完成。

如果您有一个 30GB 的未压缩文本文件存储在 HDFS 上,那么使用默认的 HDFS 块大小设置 (128MB) 它将存储在 235 个块中,这意味着您从该文件读取的 RDD 将有 235 个分区.当您调用 repartition(1000) 时,您的 RDD 将被标记为 重新分区 ,但实际上只有当您在此 RDD 之上执行操作时,它才会被洗牌到 1000 个分区(惰性执行概念)

添加到 @0x0FFF 如果它从 HDFS 中获取输入文件,它将像这样计算 rdd = SparkContext().textFile("hdfs://.../file.txt"),当您执行 rdd.getNumPatitions 时,将得到 Max(2, Number of HDFS block)。我做了很多实验,结果发现了这个。再次明确地,您可以执行 rdd = SparkContext().textFile("hdfs://.../file.txt", 400) 以获得 400 作为分区,甚至可以通过 rdd.repartition 进行重新分区或通过 rdd.coalesce(10)

减少到 10

Here is the snapshot of "How blocks in HDFS are loaded into Spark workers as partitions"

在此图像中,4 个 HDFS 块作为 Spark 分区加载到 3 个工作内存中


Example: I put a 30GB Textfile on the HDFS-System, which is distributing it on 10 nodes.

Will Spark

a) use the same 10 partitions?

Spark load same 10 HDFS bocks to workers memory as partitions. I assume block size of 30 GB file should be 3 GB to get 10 partitions/blocks (with default conf)

b) 当我调用 repartition(1000) 时跨集群随机移动 30GB?

Yes, Spark shuffle the data among the worker nodes in order to create 1000 partitions in workers memory.

注:

HDFS Block -> Spark partition   : One block can represent as One partition (by default)
Spark partition -> Workers      : Many/One partitions can present in One workers 

使用 spark-sql 读取非分桶 HDFS 文件(例如 parquet)时,DataFrame 分区数 df.rdd.getNumPartitions 取决于以下因素:

  • spark.default.parallelism(大致翻译为应用程序可用的#cores)
  • spark.sql.files.maxPartitionBytes(默认 128MB)
  • spark.sql.files.openCostInBytes(默认 4MB)

分区数量粗略估计为:

  • 如果您有足够的核心来并行读取所有数据,(即每 128MB 数据至少有一个核心)

    AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores) NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

  • 如果您没有足够的内核,

    AveragePartitionSize ≈ 128MB NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

具体的计算有点复杂,可以在FileSourceScanExec的代码库中找到,参考here