是否可以异步执行 dbutils io?
Is it possible to do dbutils io asynchronously?
我已经编写了一些代码(基于 )将分区数据写入 blob,而且大部分速度都非常快。最慢的部分是我让 spark 生成的每个分区的一个 csv 文件以一种用户不友好的方式命名,所以我做了一个简单的重命名操作来清理它们(并删除一些多余的文件)。这比首先写入数据花费的时间要长得多。
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
from datetime import datetime
def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
extension = ".csv.gz" # TODO: Support multiple extention
start = datetime.now()
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Recursively traverse all partition subdirectories and rename + move the CSV to their root
# TODO: This is very slow, it should be parallelizable
def traverse(root, remaining_partitions):
if VERBOSE: log(f"Traversing partitions by {remaining_partitions[0]} within folder: {root}")
for folder in list_subfolders(root):
subdirectory = os.path.join(root, folder)
if(len(remaining_partitions) > 1):
traverse(subdirectory, remaining_partitions[1:])
destination = os.path.join(root, folder[len(f"{remaining_partitions[0]}="):]) + extension
if VERBOSE: log(f"Moving file\nFrom:{subdirectory}\n To:{destination}")
spark_output_to_single_file(subdirectory, destination, VERBOSE)
log(f"Cleaning up spark output directories...")
start = datetime.now()
traverse(path, partitions)
log(f"Moving output files to their destination took {(datetime.now() - start).total_seconds():,.2f} seconds")
# Convert a single-file spark output folder into a single file at the specified location, and clean up superfluous artifacts
def spark_output_to_single_file(output_folder, destination_path, VERBOSE = False):
output_files = [x for x in dbutils.fs.ls(output_folder) if x.name.startswith("part-")]
if(len(output_files) == 0):
raise FileNotFoundError(f"Could not find any output files (prefixed with 'part-') in the specified spark output folder: {output_folder}")
if(len(output_files) > 1):
raise ValueError(f"The specified spark folder has more than 1 output file in the specified spark output folder: {output_folder}\n" +
f"We found {len(output_files)}: {[x.name for x in output_files]}\n" +
f"This function should only be used for single-file spark outputs.")
dbutils.fs.mv(output_files[0].path, destination_path)
# Clean up all the other spark output generated to our temp folder
dbutils.fs.rm(output_folder, recurse=True)
if VERBOSE: log(f"Successfully wrote {destination_path}")
2022-04-22 20:36:45.313963 Wrote df_test data partitioned by ['Granularity', 'PORTINFOID'] and sorted by ['Rank'] to: /mnt/.../all_data_by_rank
Time taken: 19.31 seconds
2022-04-22 20:36:45.314020 Cleaning up spark output directories...
2022-04-22 20:37:42.583850 Moving output files to their destination took 57.27 seconds
问题是数据块上的所有 IO 都是使用“dbutils”完成的,它抽象出已安装的 blob 容器并使这类事情变得非常容易。不过,我只是找不到有关使用此实用程序执行异步 IO 的任何信息。
有谁知道我如何尝试并行化这个 activity?
最终的解决方案是放弃 dbutils
,它不以任何方式支持并行性,而是使用 os
import os
from datetime import datetime
from pyspark.sql.types import StringType
# Recursively traverse all partition subdirectories and rename + move the outputs to their root
# NOTE: The code to do this sequentially is much simpler, but very slow. The complexity arises from parallelising the file operations
def spark_output_to_single_file_per_partition(root, partitions, output_extension, VERBOSE = False):
if VERBOSE: log(f"Cleaning up spark output directories...")
start = datetime.now()
# Helper to recursively collect information from all partitions and flatten it into a single list
def traverse_partitions(root, partitions, fn_collect_info, currentPartition = None):
results = [fn_collect_info(root, currentPartition)]
return results if len(partitions) == 0 else results + \
[result for subdir in [traverse_partitions(os.path.join(root, folder), partitions[1:], fn_collect_info, partitions[0]) for folder in list_subfolders(root)] for result in subdir]
# Get the path of files to rename or delete. Note: We must convert to OS paths because we cannot parallelize use of dbutils
def find_files_to_rename_and_delete(folder, partition):
files = [x.name for x in dbutils.fs.ls(folder)]
renames = [x for x in files if x[0:5] == "part-"]
deletes = [f"/dbfs{folder}/{x}" for x in files if x[0:1] == "_"]
if len(renames) > 0 and partition is None: raise Exception(f"Found {len(files)} partition file(s) in the root location: {folder}. Have files already been moved?")
elif len(renames) > 1: raise Exception(f"Expected at most one partition file, but found {len(files)} in location: {folder}")
elif len(renames) == 1: deletes.append(f"/dbfs{folder}/") # The leaf-folders (containing partitions) should be deleted after the file is moved
return (deletes, None if len(renames) == 0 else (f"/dbfs{folder}/{renames[0]}", f"/dbfs{folder.replace(partition + '=', '')}{output_extension}"))
# Scan the file system to find all files and folders that need to be moved and deleted
if VERBOSE: log(f"Collecting a list of files that need to be renamed and deleted...")
actions = traverse_partitions(root, partitions, find_files_to_rename_and_delete)
# Rename all files in parallel using spark executors
renames = [rename for (deletes, rename) in actions if rename is not None]
if VERBOSE: log(f"Renaming {len(renames)} partition files...")
spark.createDataFrame(renames, ['from', 'to']).foreach(lambda r: os.rename(r[0], r[1]))
# Delete unwanted spark temp files and empty folders
deletes = [path for (deletes, rename) in actions for path in deletes]
delete_files = [d for d in deletes if d[-1] != "/"]
delete_folders = [d for d in deletes if d[-1] == "/"]
if VERBOSE: log(f"Deleting {len(delete_files)} spark outputs...")
spark.createDataFrame(delete_files, StringType()).foreach(lambda r: os.remove(r[0]))
if VERBOSE: log(f"Deleting {len(delete_folders)} empty folders...")
spark.createDataFrame(delete_folders, StringType()).foreach(lambda r: os.rmdir(r[0]))
log(f"Moving output files to their destination and cleaning spark artifacts took {(datetime.now() - start).total_seconds():,.2f} seconds")
这使您可以生成分区数据,名称为 user-friendly,并清除在此过程中生成的所有 spark 临时文件(_started...、_committed...、_SUCCESS)。
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
def dataframe_to_csv_gz_per_partition(df, path, partitions, sort_within_partitions, rename_spark_outputs = True, VERBOSE = False):
start = datetime.now()
# Write the actual data to disk using spark
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Rename outputs and clean up
spark_output_to_single_file_per_partition(path, partitions, ".csv.gz", VERBOSE)
为了它的价值,我也尝试使用 Pool
进行并行化,但结果并不理想。我没有尝试导入和使用任何可以执行异步 io 的库,我想这会表现最好。
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
from datetime import datetime
def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
extension = ".csv.gz" # TODO: Support multiple extention
start = datetime.now()
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Recursively traverse all partition subdirectories and rename + move the CSV to their root
# TODO: This is very slow, it should be parallelizable
def traverse(root, remaining_partitions):
if VERBOSE: log(f"Traversing partitions by {remaining_partitions[0]} within folder: {root}")
for folder in list_subfolders(root):
subdirectory = os.path.join(root, folder)
if(len(remaining_partitions) > 1):
traverse(subdirectory, remaining_partitions[1:])
destination = os.path.join(root, folder[len(f"{remaining_partitions[0]}="):]) + extension
if VERBOSE: log(f"Moving file\nFrom:{subdirectory}\n To:{destination}")
spark_output_to_single_file(subdirectory, destination, VERBOSE)
log(f"Cleaning up spark output directories...")
start = datetime.now()
traverse(path, partitions)
log(f"Moving output files to their destination took {(datetime.now() - start).total_seconds():,.2f} seconds")
# Convert a single-file spark output folder into a single file at the specified location, and clean up superfluous artifacts
def spark_output_to_single_file(output_folder, destination_path, VERBOSE = False):
output_files = [x for x in dbutils.fs.ls(output_folder) if x.name.startswith("part-")]
if(len(output_files) == 0):
raise FileNotFoundError(f"Could not find any output files (prefixed with 'part-') in the specified spark output folder: {output_folder}")
if(len(output_files) > 1):
raise ValueError(f"The specified spark folder has more than 1 output file in the specified spark output folder: {output_folder}\n" +
f"We found {len(output_files)}: {[x.name for x in output_files]}\n" +
f"This function should only be used for single-file spark outputs.")
dbutils.fs.mv(output_files[0].path, destination_path)
# Clean up all the other spark output generated to our temp folder
dbutils.fs.rm(output_folder, recurse=True)
if VERBOSE: log(f"Successfully wrote {destination_path}")
2022-04-22 20:36:45.313963 Wrote df_test data partitioned by ['Granularity', 'PORTINFOID'] and sorted by ['Rank'] to: /mnt/.../all_data_by_rank
Time taken: 19.31 seconds
2022-04-22 20:36:45.314020 Cleaning up spark output directories...
2022-04-22 20:37:42.583850 Moving output files to their destination took 57.27 seconds
问题是数据块上的所有 IO 都是使用“dbutils”完成的,它抽象出已安装的 blob 容器并使这类事情变得非常容易。不过,我只是找不到有关使用此实用程序执行异步 IO 的任何信息。
有谁知道我如何尝试并行化这个 activity?
最终的解决方案是放弃 dbutils
,它不以任何方式支持并行性,而是使用 os
import os
from datetime import datetime
from pyspark.sql.types import StringType
# Recursively traverse all partition subdirectories and rename + move the outputs to their root
# NOTE: The code to do this sequentially is much simpler, but very slow. The complexity arises from parallelising the file operations
def spark_output_to_single_file_per_partition(root, partitions, output_extension, VERBOSE = False):
if VERBOSE: log(f"Cleaning up spark output directories...")
start = datetime.now()
# Helper to recursively collect information from all partitions and flatten it into a single list
def traverse_partitions(root, partitions, fn_collect_info, currentPartition = None):
results = [fn_collect_info(root, currentPartition)]
return results if len(partitions) == 0 else results + \
[result for subdir in [traverse_partitions(os.path.join(root, folder), partitions[1:], fn_collect_info, partitions[0]) for folder in list_subfolders(root)] for result in subdir]
# Get the path of files to rename or delete. Note: We must convert to OS paths because we cannot parallelize use of dbutils
def find_files_to_rename_and_delete(folder, partition):
files = [x.name for x in dbutils.fs.ls(folder)]
renames = [x for x in files if x[0:5] == "part-"]
deletes = [f"/dbfs{folder}/{x}" for x in files if x[0:1] == "_"]
if len(renames) > 0 and partition is None: raise Exception(f"Found {len(files)} partition file(s) in the root location: {folder}. Have files already been moved?")
elif len(renames) > 1: raise Exception(f"Expected at most one partition file, but found {len(files)} in location: {folder}")
elif len(renames) == 1: deletes.append(f"/dbfs{folder}/") # The leaf-folders (containing partitions) should be deleted after the file is moved
return (deletes, None if len(renames) == 0 else (f"/dbfs{folder}/{renames[0]}", f"/dbfs{folder.replace(partition + '=', '')}{output_extension}"))
# Scan the file system to find all files and folders that need to be moved and deleted
if VERBOSE: log(f"Collecting a list of files that need to be renamed and deleted...")
actions = traverse_partitions(root, partitions, find_files_to_rename_and_delete)
# Rename all files in parallel using spark executors
renames = [rename for (deletes, rename) in actions if rename is not None]
if VERBOSE: log(f"Renaming {len(renames)} partition files...")
spark.createDataFrame(renames, ['from', 'to']).foreach(lambda r: os.rename(r[0], r[1]))
# Delete unwanted spark temp files and empty folders
deletes = [path for (deletes, rename) in actions for path in deletes]
delete_files = [d for d in deletes if d[-1] != "/"]
delete_folders = [d for d in deletes if d[-1] == "/"]
if VERBOSE: log(f"Deleting {len(delete_files)} spark outputs...")
spark.createDataFrame(delete_files, StringType()).foreach(lambda r: os.remove(r[0]))
if VERBOSE: log(f"Deleting {len(delete_folders)} empty folders...")
spark.createDataFrame(delete_folders, StringType()).foreach(lambda r: os.rmdir(r[0]))
log(f"Moving output files to their destination and cleaning spark artifacts took {(datetime.now() - start).total_seconds():,.2f} seconds")
这使您可以生成分区数据,名称为 user-friendly,并清除在此过程中生成的所有 spark 临时文件(_started...、_committed...、_SUCCESS)。
# Organize the data into a folders matching the specified partitions, with a single CSV per partition
def dataframe_to_csv_gz_per_partition(df, path, partitions, sort_within_partitions, rename_spark_outputs = True, VERBOSE = False):
start = datetime.now()
# Write the actual data to disk using spark
df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
.write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"\n {path}\n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
# Rename outputs and clean up
spark_output_to_single_file_per_partition(path, partitions, ".csv.gz", VERBOSE)
为了它的价值,我也尝试使用 Pool
进行并行化,但结果并不理想。我没有尝试导入和使用任何可以执行异步 io 的库,我想这会表现最好。