Spark 镶木地板分区:大量文件

Spark parquet partitioning : Large number of files

我正在尝试利用 spark 分区。我正在尝试做类似

的事情
data.write.partitionBy("key").parquet("/location")

这里的问题是每个分区都会创建大量的 parquet 文件,如果我尝试从根目录读取,这会导致读取速度变慢。

为了避免这种情况,我尝试了

data.coalese(numPart).write.partitionBy("key").parquet("/location")

然而,这会在每个分区中创建 numPart 数量的 parquet 文件。 现在我的分区大小不同了。所以我理想情况下希望每个分区都有单独的合并。然而,这看起来并不容易。我需要访问所有分区合并到一定数量并存储在单独的位置。

如何使用分区来避免写完很多文件?

首先,我真的会避免使用 coalesce,因为这通常会在转换链中被进一步推高,并且可能会破坏您的工作的并行性(我在这里问过这个问题: )

为每个 parquet 分区写入 1 个文件真的很容易(参见 ):

data.repartition($"key").write.partitionBy("key").parquet("/location")

如果你想设置任意数量的文件(或大小相同的文件),你需要使用另一个可以使用的属性进一步重新分区你的数据(我不能告诉你这可能是什么)你的情况):

data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")

another_key 可能是数据集的另一个属性,或者是对现有属性使用某些模数或舍入操作的派生属性。您甚至可以在 key 上使用 window 函数和 row_number,然后用

之类的东西四舍五入
data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")

这会将您 N 条记录放入 1 个 parquet 文件

使用 orderBy

您还可以通过相应地对数据帧进行排序来控制文件数量而无需重新分区:

data.orderBy($"key").write.partitionBy("key").parquet("/location")

这将导致所有分区中的文件总数(至少,但不会超过)spark.sql.shuffle.partitions 个(默认为 200 个)。在 $key 之后添加第二个排序列甚至是有益的,因为 parquet 会记住数据帧的排序并相应地写入统计信息。例如,您可以通过 ID 订购:

data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")

这不会改变文件的数量,但是当您查询给定的 keyid 的 parquet 文件时,它会提高性能。参见例如https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide and https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

Spark 2.2+

从 Spark 2.2 开始,如果文件太大,您还可以使用新选项 maxRecordsPerFile 来限制每个文件的记录数 。如果你有 N 个分区,你仍然会得到至少 N 个文件,但是你可以将 1 个分区(任务)写入的文件拆分成更小的块:

df.write
.option("maxRecordsPerFile", 10000)
...

参见例如http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/ and

这对我很有效:

data.repartition(n, "key").write.partitionBy("key").parquet("/location")

它在每个输出分区(目录)中生成 N 个文件,而且(有趣的是)比使用 coalesce 更快(同样,有趣的是,在我的数据集上) 比仅在输出上重新分区更快。

如果您使用的是 S3,我还建议在本地驱动器上执行所有操作(Spark 在写出期间会处理大量文件 creation/rename/deletion),一旦一切就绪,请使用 hadoop FileUtil(或者只是 aws cli) 复制所有内容:

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
  def copy(
          in : String,
          out : String,
          sparkSession: SparkSession
          ) = {
    FileUtil.copy(
      FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
      new Path(in),
      FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
      new Path(out),
      false,
      sparkSession.sparkContext.hadoopConfiguration
    )
  }

编辑:根据评论中的讨论:

您的数据集的分区列为 YEAR,但每个给定的 YEAR 中的数据量都大不相同。因此,一年可能有 1GB 的数据,但另一年可能有 100GB。

这是处理此问题的一种方法的伪代码:

val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
  val subDf = df.filter(s"YEAR = $yearVal")
  val numPartitionsToUse = subDf.count / partitionSize
  subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})

但是,我实际上并不知道这会起作用。 Spark 可能会在读取每个列分区的可变数量的文件时遇到问题。

另一种方法是编写您自己的自定义分区程序,但我不知道其中涉及什么,所以我无法提供任何代码。

让我们用另一种方法扩展 Raphael Roth 的回答,该方法将为每个分区可以包含的文件数创建一个上限,:

import org.apache.spark.sql.functions.rand

df.repartition(numPartitions, $"some_col", rand)
  .write.partitionBy("some_col")
  .parquet("partitioned_lake")

这里的其他答案都很好但是有一些问题:

  • 依靠 maxRecordsPerFile 将大分区分解成较小的文件非常方便,但有两个注意事项:

    1. 如果您的分区列严重倾斜,按它们重新分区意味着可能会将最大数据分区的所有数据移动到单个 DataFrame 分区中。如果该 DataFrame 分区变得太大,仅此一项就可能使您的工作崩溃。

      举一个简单的例子,想象一下 repartition("country") 对一个 DataFrame 有什么影响,该数据框有 1 行代表世界上每个人。

    2. maxRecordsPerFile 将确保您的输出文件不超过一定的行数,但只有一个任务才能连续写出这些文件。一个任务将不得不处理整个数据分区,而不是能够用多个任务写出那个大数据分区。

  • repartition(numPartitions, $"some_col", rand) 是一个优雅的解决方案,但不能很好地处理小数据分区。它会为每个数据分区写出 numPartitions 个文件,即使它们很小。

    在很多情况下这可能不是问题,但如果您有一个大型数据湖,您就会知道写出许多小文件会随着时间的推移降低数据湖的性能。

因此,一种解决方案不适用于非常大的数据分区,而另一种解决方案不适用于非常小的数据分区。

我们需要的是一种根据数据分区的大小动态调整输出文件数量的方法。如果它非常大,我们需要很多文件。如果它很小,我们只需要几个文件,甚至一个文件。

解决方案是使用 repartition(..., rand) 扩展方法,并根据该数据分区所需的输出文件数量动态扩展 rand 的范围。

下面是 关于一个非常相似的问题的要点:

# In this example, `id` is a column in `skewed_data`.
partition_by_columns = ['id']
desired_rows_per_output_file = 10

partition_count = skewed_data.groupBy(partition_by_columns).count()

partition_balanced_data = (
    skewed_data
    .join(partition_count, on=partition_by_columns)
    .withColumn(
        'repartition_seed',
        (
            rand() * partition_count['count'] / desired_rows_per_output_file
        ).cast('int')
    )
    .repartition(*partition_by_columns, 'repartition_seed')
)

这将平衡输出文件的大小,无论分区倾斜如何,并且不会限制您的并行度或为小分区生成太多小文件。

如果您想自己 运行 此代码,我提供了 ,以及 DataFrame 分区正确平衡的证明。