是否可以异步执行 dbutils io?
Is it possible to do dbutils io asynchronously?
我已经编写了一些代码(基于 )将分区数据写入 blob,而且大部分速度都非常快。最慢的部分是我让 spark 生成的每个分区的一个 csv 文件以一种用户不友好的方式命名,所以我做了一个简单的重命名操作来清理它们(并删除一些多余的文件)。这比首先写入数据花费的时间要长得多。
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
from datetime import datetime
def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
extension = ".csv.gz" # TODO: Support multiple extention
start = datetime.now()
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Recursively traverse all partition subdirectories and rename + move the CSV to their root
# TODO: This is very slow, it should be parallelizable
def traverse(root, remaining_partitions):
if VERBOSE: log(f"Traversing partitions by {remaining_partitions[0]} within folder: {root}")
for folder in list_subfolders(root):
subdirectory = os.path.join(root, folder)
if(len(remaining_partitions) > 1):
traverse(subdirectory, remaining_partitions[1:])
else:
destination = os.path.join(root, folder[len(f"{remaining_partitions[0]}="):]) + extension
if VERBOSE: log(f"Moving file\nFrom:{subdirectory}\n To:{destination}")
spark_output_to_single_file(subdirectory, destination, VERBOSE)
log(f"Cleaning up spark output directories...")
start = datetime.now()
traverse(path, partitions)
log(f"Moving output files to their destination took {(datetime.now() - start).total_seconds():,.2f} seconds")
# Convert a single-file spark output folder into a single file at the specified location, and clean up superfluous artifacts
def spark_output_to_single_file(output_folder, destination_path, VERBOSE = False):
output_files = [x for x in dbutils.fs.ls(output_folder) if x.name.startswith("part-")]
if(len(output_files) == 0):
raise FileNotFoundError(f"Could not find any output files (prefixed with 'part-') in the specified spark output folder: {output_folder}")
if(len(output_files) > 1):
raise ValueError(f"The specified spark folder has more than 1 output file in the specified spark output folder: {output_folder}\n" +
f"We found {len(output_files)}: {[x.name for x in output_files]}\n" +
f"This function should only be used for single-file spark outputs.")
dbutils.fs.mv(output_files[0].path, destination_path)
# Clean up all the other spark output generated to our temp folder
dbutils.fs.rm(output_folder, recurse=True)
if VERBOSE: log(f"Successfully wrote {destination_path}")
这是一个示例输出:
2022-04-22 20:36:45.313963 Wrote df_test data partitioned by ['Granularity', 'PORTINFOID'] and sorted by ['Rank'] to: /mnt/.../all_data_by_rank
Time taken: 19.31 seconds
2022-04-22 20:36:45.314020 Cleaning up spark output directories...
2022-04-22 20:37:42.583850 Moving output files to their destination took 57.27 seconds
我认为原因是我正在按顺序处理文件夹,如果我可以简单地并行处理,速度会快得多。
问题是数据块上的所有 IO 都是使用“dbutils”完成的,它抽象出已安装的 blob 容器并使这类事情变得非常容易。不过,我只是找不到有关使用此实用程序执行异步 IO 的任何信息。
有谁知道我如何尝试并行化这个 activity?
最终的解决方案是放弃 dbutils
,它不以任何方式支持并行性,而是使用 os
操作,它支持:
import os
from datetime import datetime
from pyspark.sql.types import StringType
# Recursively traverse all partition subdirectories and rename + move the outputs to their root
# NOTE: The code to do this sequentially is much simpler, but very slow. The complexity arises from parallelising the file operations
def spark_output_to_single_file_per_partition(root, partitions, output_extension, VERBOSE = False):
if VERBOSE: log(f"Cleaning up spark output directories...")
start = datetime.now()
# Helper to recursively collect information from all partitions and flatten it into a single list
def traverse_partitions(root, partitions, fn_collect_info, currentPartition = None):
results = [fn_collect_info(root, currentPartition)]
return results if len(partitions) == 0 else results + \
[result for subdir in [traverse_partitions(os.path.join(root, folder), partitions[1:], fn_collect_info, partitions[0]) for folder in list_subfolders(root)] for result in subdir]
# Get the path of files to rename or delete. Note: We must convert to OS paths because we cannot parallelize use of dbutils
def find_files_to_rename_and_delete(folder, partition):
files = [x.name for x in dbutils.fs.ls(folder)]
renames = [x for x in files if x[0:5] == "part-"]
deletes = [f"/dbfs{folder}/{x}" for x in files if x[0:1] == "_"]
if len(renames) > 0 and partition is None: raise Exception(f"Found {len(files)} partition file(s) in the root location: {folder}. Have files already been moved?")
elif len(renames) > 1: raise Exception(f"Expected at most one partition file, but found {len(files)} in location: {folder}")
elif len(renames) == 1: deletes.append(f"/dbfs{folder}/") # The leaf-folders (containing partitions) should be deleted after the file is moved
return (deletes, None if len(renames) == 0 else (f"/dbfs{folder}/{renames[0]}", f"/dbfs{folder.replace(partition + '=', '')}{output_extension}"))
# Scan the file system to find all files and folders that need to be moved and deleted
if VERBOSE: log(f"Collecting a list of files that need to be renamed and deleted...")
actions = traverse_partitions(root, partitions, find_files_to_rename_and_delete)
# Rename all files in parallel using spark executors
renames = [rename for (deletes, rename) in actions if rename is not None]
if VERBOSE: log(f"Renaming {len(renames)} partition files...")
spark.createDataFrame(renames, ['from', 'to']).foreach(lambda r: os.rename(r[0], r[1]))
# Delete unwanted spark temp files and empty folders
deletes = [path for (deletes, rename) in actions for path in deletes]
delete_files = [d for d in deletes if d[-1] != "/"]
delete_folders = [d for d in deletes if d[-1] == "/"]
if VERBOSE: log(f"Deleting {len(delete_files)} spark outputs...")
spark.createDataFrame(delete_files, StringType()).foreach(lambda r: os.remove(r[0]))
if VERBOSE: log(f"Deleting {len(delete_folders)} empty folders...")
spark.createDataFrame(delete_folders, StringType()).foreach(lambda r: os.rmdir(r[0]))
log(f"Moving output files to their destination and cleaning spark artifacts took {(datetime.now() - start).total_seconds():,.2f} seconds")
这使您可以生成分区数据,名称为 user-friendly,并清除在此过程中生成的所有 spark 临时文件(_started...、_committed...、_SUCCESS)。
用法:
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
def dataframe_to_csv_gz_per_partition(df, path, partitions, sort_within_partitions, rename_spark_outputs = True, VERBOSE = False):
start = datetime.now()
# Write the actual data to disk using spark
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Rename outputs and clean up
spark_output_to_single_file_per_partition(path, partitions, ".csv.gz", VERBOSE)
为了它的价值,我也尝试使用 Pool
进行并行化,但结果并不理想。我没有尝试导入和使用任何可以执行异步 io 的库,我想这会表现最好。
我已经编写了一些代码(基于
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
from datetime import datetime
def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
extension = ".csv.gz" # TODO: Support multiple extention
start = datetime.now()
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Recursively traverse all partition subdirectories and rename + move the CSV to their root
# TODO: This is very slow, it should be parallelizable
def traverse(root, remaining_partitions):
if VERBOSE: log(f"Traversing partitions by {remaining_partitions[0]} within folder: {root}")
for folder in list_subfolders(root):
subdirectory = os.path.join(root, folder)
if(len(remaining_partitions) > 1):
traverse(subdirectory, remaining_partitions[1:])
else:
destination = os.path.join(root, folder[len(f"{remaining_partitions[0]}="):]) + extension
if VERBOSE: log(f"Moving file\nFrom:{subdirectory}\n To:{destination}")
spark_output_to_single_file(subdirectory, destination, VERBOSE)
log(f"Cleaning up spark output directories...")
start = datetime.now()
traverse(path, partitions)
log(f"Moving output files to their destination took {(datetime.now() - start).total_seconds():,.2f} seconds")
# Convert a single-file spark output folder into a single file at the specified location, and clean up superfluous artifacts
def spark_output_to_single_file(output_folder, destination_path, VERBOSE = False):
output_files = [x for x in dbutils.fs.ls(output_folder) if x.name.startswith("part-")]
if(len(output_files) == 0):
raise FileNotFoundError(f"Could not find any output files (prefixed with 'part-') in the specified spark output folder: {output_folder}")
if(len(output_files) > 1):
raise ValueError(f"The specified spark folder has more than 1 output file in the specified spark output folder: {output_folder}\n" +
f"We found {len(output_files)}: {[x.name for x in output_files]}\n" +
f"This function should only be used for single-file spark outputs.")
dbutils.fs.mv(output_files[0].path, destination_path)
# Clean up all the other spark output generated to our temp folder
dbutils.fs.rm(output_folder, recurse=True)
if VERBOSE: log(f"Successfully wrote {destination_path}")
这是一个示例输出:
2022-04-22 20:36:45.313963 Wrote df_test data partitioned by ['Granularity', 'PORTINFOID'] and sorted by ['Rank'] to: /mnt/.../all_data_by_rank
Time taken: 19.31 seconds
2022-04-22 20:36:45.314020 Cleaning up spark output directories...
2022-04-22 20:37:42.583850 Moving output files to their destination took 57.27 seconds
我认为原因是我正在按顺序处理文件夹,如果我可以简单地并行处理,速度会快得多。
问题是数据块上的所有 IO 都是使用“dbutils”完成的,它抽象出已安装的 blob 容器并使这类事情变得非常容易。不过,我只是找不到有关使用此实用程序执行异步 IO 的任何信息。
有谁知道我如何尝试并行化这个 activity?
最终的解决方案是放弃 dbutils
,它不以任何方式支持并行性,而是使用 os
操作,它支持:
import os
from datetime import datetime
from pyspark.sql.types import StringType
# Recursively traverse all partition subdirectories and rename + move the outputs to their root
# NOTE: The code to do this sequentially is much simpler, but very slow. The complexity arises from parallelising the file operations
def spark_output_to_single_file_per_partition(root, partitions, output_extension, VERBOSE = False):
if VERBOSE: log(f"Cleaning up spark output directories...")
start = datetime.now()
# Helper to recursively collect information from all partitions and flatten it into a single list
def traverse_partitions(root, partitions, fn_collect_info, currentPartition = None):
results = [fn_collect_info(root, currentPartition)]
return results if len(partitions) == 0 else results + \
[result for subdir in [traverse_partitions(os.path.join(root, folder), partitions[1:], fn_collect_info, partitions[0]) for folder in list_subfolders(root)] for result in subdir]
# Get the path of files to rename or delete. Note: We must convert to OS paths because we cannot parallelize use of dbutils
def find_files_to_rename_and_delete(folder, partition):
files = [x.name for x in dbutils.fs.ls(folder)]
renames = [x for x in files if x[0:5] == "part-"]
deletes = [f"/dbfs{folder}/{x}" for x in files if x[0:1] == "_"]
if len(renames) > 0 and partition is None: raise Exception(f"Found {len(files)} partition file(s) in the root location: {folder}. Have files already been moved?")
elif len(renames) > 1: raise Exception(f"Expected at most one partition file, but found {len(files)} in location: {folder}")
elif len(renames) == 1: deletes.append(f"/dbfs{folder}/") # The leaf-folders (containing partitions) should be deleted after the file is moved
return (deletes, None if len(renames) == 0 else (f"/dbfs{folder}/{renames[0]}", f"/dbfs{folder.replace(partition + '=', '')}{output_extension}"))
# Scan the file system to find all files and folders that need to be moved and deleted
if VERBOSE: log(f"Collecting a list of files that need to be renamed and deleted...")
actions = traverse_partitions(root, partitions, find_files_to_rename_and_delete)
# Rename all files in parallel using spark executors
renames = [rename for (deletes, rename) in actions if rename is not None]
if VERBOSE: log(f"Renaming {len(renames)} partition files...")
spark.createDataFrame(renames, ['from', 'to']).foreach(lambda r: os.rename(r[0], r[1]))
# Delete unwanted spark temp files and empty folders
deletes = [path for (deletes, rename) in actions for path in deletes]
delete_files = [d for d in deletes if d[-1] != "/"]
delete_folders = [d for d in deletes if d[-1] == "/"]
if VERBOSE: log(f"Deleting {len(delete_files)} spark outputs...")
spark.createDataFrame(delete_files, StringType()).foreach(lambda r: os.remove(r[0]))
if VERBOSE: log(f"Deleting {len(delete_folders)} empty folders...")
spark.createDataFrame(delete_folders, StringType()).foreach(lambda r: os.rmdir(r[0]))
log(f"Moving output files to their destination and cleaning spark artifacts took {(datetime.now() - start).total_seconds():,.2f} seconds")
这使您可以生成分区数据,名称为 user-friendly,并清除在此过程中生成的所有 spark 临时文件(_started...、_committed...、_SUCCESS)。
用法:
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
def dataframe_to_csv_gz_per_partition(df, path, partitions, sort_within_partitions, rename_spark_outputs = True, VERBOSE = False):
start = datetime.now()
# Write the actual data to disk using spark
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Rename outputs and clean up
spark_output_to_single_file_per_partition(path, partitions, ".csv.gz", VERBOSE)
为了它的价值,我也尝试使用 Pool
进行并行化,但结果并不理想。我没有尝试导入和使用任何可以执行异步 io 的库,我想这会表现最好。