Spark 重新分区执行器

Spark Repartition Executors

我有一个大约 100GB 的数据源,我正在尝试使用日期列对其进行分区。

为了避免分区内出现小块,我添加了一个 repartition(5),每个分区内最多有 5 个文件:

df.repartition(5).write.orc("path")

我的问题是,在我分配的 30 个执行者中,实际上只有 5 个 运行。最后我得到了我想要的(每个分区内有5个文件),但是由于只有5个执行者运行,执行时间非常长。

你对我如何让它更快有什么建议吗?

您可以使用重新分区和 partitionBy 来解决问题。 有两种方法可以解决这个问题。

假设您需要按日期列进行分区

df.repartition(5, 'dateColumn').write.partitionBy('dateColumn').parquet(path)

在这种情况下,使用的执行器数量将等于 5 * distinct(dateColumn),您的所有日期将包含 5 个文件。

另一种方法是将数据重新分区 3 倍于执行程序数量,然后使用 maxRecordsPerFile 保存数据,这将创建大小相等的文件,但您将失去对创建文件数量的控制

df.repartition(60).write.option('maxRecordsPerFile',200000).partitionBy('dateColumn').parquet(path)

Spark 可以 运行 为 RDD 或数据帧的每个分区执行 1 个并发任务(最多为集群中的核心数)。如果您的集群有 30 个核心,您应该至少有 30 个分区。另一方面,单个分区通常不应包含超过 128MB,单个随机块不能大于 2GB(请参阅 SPARK-6235)。 由于您想减少执行时间,因此最好增加分区数量,并在作业结束时减少特定作业的分区数量。 为了在分区之间更好地(均匀地)分配数据,最好使用散列分区程序。

我简单地修复了它:

df.repartition($"dateColumn").write.partitionBy("dateColumn").orc(path)

并分配与我将在输出中拥有的分区数量相同数量的执行程序。

谢谢大家