保存到分区 parquet 文件时实现并发
Achieve concurrency when saving to a partitioned parquet file
使用 partitionBy
将 dataframe
写入 parquet
时:
df.write.partitionBy("col1","col2","col3").parquet(path)
我希望每个正在写入的分区都是由一个单独的任务独立完成的,并且与分配给当前 spark 作业的工作人员数量并行。
然而实际上只有一个 worker/task 运行ning 在写入镶木地板时一次。一名工作人员循环遍历每个分区并连续写出 .parquet
文件。为什么会这样 - 有没有办法在这个 spark.write.parquet
操作中强制并发?
以下是不是我想看的(应该是700%+
..)
从这个其他post我也试过在前面加repartition
df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)
不幸的是,这没有效果:仍然只有一名工人..
注意:我运行宁在 local
模式 local[8]
并且看到 other 火花操作 运行多达八个并发工作人员,使用高达 750% 的 CPU。
简而言之,从单个任务写入多个输出文件不是并行的,但假设您有多个任务(多个输入拆分),每个任务都将在一个 worker 上获得自己的核心。
写出分区数据的目的不是并行化您的写入操作。 Spark 已经通过一次同时写出多个任务来做到这一点。目标只是优化未来的读取操作,您只需要保存数据的一个分区。
Spark 中写入分区的逻辑旨在将前一阶段的所有记录写出到目的地时仅读取一次。我相信部分设计选择也是为了防止出现以下情况
一个分区键有很多很多值。
编辑:Spark 2.x 方法
在 Spark 2.x 中,它按分区键对每个任务中的记录进行排序,然后遍历它们,一次写入一个文件句柄。我假设他们这样做是为了确保如果您的分区键中有很多不同的值,他们永远不会打开大量文件句柄。
供参考,排序如下:
向下滚动一点,您会看到它调用 write(iter.next())
遍历每一行。
这里是实际的写作(一次一个 file/partition 键):
在那里你可以看到它一次只保存一个打开的文件句柄。
编辑:Spark 1.x 方法
spark 1.x 对给定任务所做的是遍历所有记录,每当它遇到一个新的输出分区时打开一个新的文件句柄,它以前没有见过这个任务。然后它立即将记录写入该文件句柄并转到下一个。这意味着在处理单个任务时的任何给定时间,它最多可以为该任务打开 N 个文件句柄,其中 N 是输出分区的最大数量。为了更清楚,这里有一些 python 伪代码来展示总体思路:
# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
partition_path = determine_output_path(row, partition_keys)
if partition_path not in handles:
handles[partition_path] = open(partition_path, 'w')
handles[partition_path].write(row)
上面写出记录的策略有一个警告。在 spark 1.x 中,设置 spark.sql.sources.maxConcurrentWrites
对每个任务可以打开的掩码文件句柄设置了上限。达到该值后,Spark 将改为按分区键对数据进行排序,因此它可以遍历记录,一次写出一个文件。
使用 partitionBy
将 dataframe
写入 parquet
时:
df.write.partitionBy("col1","col2","col3").parquet(path)
我希望每个正在写入的分区都是由一个单独的任务独立完成的,并且与分配给当前 spark 作业的工作人员数量并行。
然而实际上只有一个 worker/task 运行ning 在写入镶木地板时一次。一名工作人员循环遍历每个分区并连续写出 .parquet
文件。为什么会这样 - 有没有办法在这个 spark.write.parquet
操作中强制并发?
以下是不是我想看的(应该是700%+
..)
从这个其他post我也试过在前面加repartition
df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)
不幸的是,这没有效果:仍然只有一名工人..
注意:我运行宁在 local
模式 local[8]
并且看到 other 火花操作 运行多达八个并发工作人员,使用高达 750% 的 CPU。
简而言之,从单个任务写入多个输出文件不是并行的,但假设您有多个任务(多个输入拆分),每个任务都将在一个 worker 上获得自己的核心。
写出分区数据的目的不是并行化您的写入操作。 Spark 已经通过一次同时写出多个任务来做到这一点。目标只是优化未来的读取操作,您只需要保存数据的一个分区。
Spark 中写入分区的逻辑旨在将前一阶段的所有记录写出到目的地时仅读取一次。我相信部分设计选择也是为了防止出现以下情况 一个分区键有很多很多值。
编辑:Spark 2.x 方法
在 Spark 2.x 中,它按分区键对每个任务中的记录进行排序,然后遍历它们,一次写入一个文件句柄。我假设他们这样做是为了确保如果您的分区键中有很多不同的值,他们永远不会打开大量文件句柄。
供参考,排序如下:
向下滚动一点,您会看到它调用 write(iter.next())
遍历每一行。
这里是实际的写作(一次一个 file/partition 键):
在那里你可以看到它一次只保存一个打开的文件句柄。
编辑:Spark 1.x 方法
spark 1.x 对给定任务所做的是遍历所有记录,每当它遇到一个新的输出分区时打开一个新的文件句柄,它以前没有见过这个任务。然后它立即将记录写入该文件句柄并转到下一个。这意味着在处理单个任务时的任何给定时间,它最多可以为该任务打开 N 个文件句柄,其中 N 是输出分区的最大数量。为了更清楚,这里有一些 python 伪代码来展示总体思路:
# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
partition_path = determine_output_path(row, partition_keys)
if partition_path not in handles:
handles[partition_path] = open(partition_path, 'w')
handles[partition_path].write(row)
上面写出记录的策略有一个警告。在 spark 1.x 中,设置 spark.sql.sources.maxConcurrentWrites
对每个任务可以打开的掩码文件句柄设置了上限。达到该值后,Spark 将改为按分区键对数据进行排序,因此它可以遍历记录,一次写出一个文件。