是否可以异步执行 dbutils io?

Is it possible to do dbutils io asynchronously?

我已经编写了一些代码(基于 )将分区数据写入 blob,而且大部分速度都非常快。最慢的部分是我让 spark 生成的每个分区的一个 csv 文件以一种用户不友好的方式命名,所以我做了一个简单的重命名操作来清理它们(并删除一些多余的文件)。这比首先写入数据花费的时间要长得多。

  # Organize the data into a folders matching the specified partitions, with a single CSV per partition
  from datetime import datetime
  def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
    extension = ".csv.gz" # TODO: Support multiple extention
    start = datetime.now()
    df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
      .write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
    log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
        f"\n  {path}\n  Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")

    # Recursively traverse all partition subdirectories and rename + move the CSV to their root
    # TODO: This is very slow, it should be parallelizable
    def traverse(root, remaining_partitions):
      if VERBOSE: log(f"Traversing partitions by {remaining_partitions[0]} within folder: {root}")
      for folder in list_subfolders(root):
        subdirectory = os.path.join(root, folder)
        if(len(remaining_partitions) > 1):
          traverse(subdirectory, remaining_partitions[1:])
        else:
          destination = os.path.join(root, folder[len(f"{remaining_partitions[0]}="):]) + extension
          if VERBOSE: log(f"Moving file\nFrom:{subdirectory}\n  To:{destination}")
          spark_output_to_single_file(subdirectory, destination, VERBOSE)

    log(f"Cleaning up spark output directories...")
    start = datetime.now()
    traverse(path, partitions)
    log(f"Moving output files to their destination took {(datetime.now() - start).total_seconds():,.2f} seconds")
# Convert a single-file spark output folder into a single file at the specified location, and clean up superfluous artifacts
def spark_output_to_single_file(output_folder, destination_path, VERBOSE = False):
  output_files = [x for x in dbutils.fs.ls(output_folder) if x.name.startswith("part-")]
  if(len(output_files) == 0):
    raise FileNotFoundError(f"Could not find any output files (prefixed with 'part-') in the specified spark output folder: {output_folder}")
  if(len(output_files) > 1):
    raise ValueError(f"The specified spark folder has more than 1 output file in the specified spark output folder: {output_folder}\n" +
                     f"We found {len(output_files)}: {[x.name for x in output_files]}\n" +
                     f"This function should only be used for single-file spark outputs.")
  dbutils.fs.mv(output_files[0].path, destination_path)
  # Clean up all the other spark output generated to our temp folder
  dbutils.fs.rm(output_folder, recurse=True)
  if VERBOSE: log(f"Successfully wrote {destination_path}")

这是一个示例输出:

2022-04-22 20:36:45.313963 Wrote df_test data partitioned by ['Granularity', 'PORTINFOID'] and sorted by ['Rank'] to: /mnt/.../all_data_by_rank

Time taken: 19.31 seconds

2022-04-22 20:36:45.314020 Cleaning up spark output directories...

2022-04-22 20:37:42.583850 Moving output files to their destination took 57.27 seconds

我认为原因是我正在按顺序处理文件夹,如果我可以简单地并行处理,速度会快得多。

问题是数据块上的所有 IO 都是使用“dbutils”完成的,它抽象出已安装的 blob 容器并使这类事情变得非常容易。不过,我只是找不到有关使用此实用程序执行异步 IO 的任何信息。

有谁知道我如何尝试并行化这个 activity?

最终的解决方案是放弃 dbutils,它不以任何方式支持并行性,而是使用 os 操作,它支持:

import os
from datetime import datetime
from pyspark.sql.types import StringType

# Recursively traverse all partition subdirectories and rename + move the outputs to their root
# NOTE: The code to do this sequentially is much simpler, but very slow. The complexity arises from parallelising the file operations
def spark_output_to_single_file_per_partition(root, partitions, output_extension, VERBOSE = False):
  if VERBOSE: log(f"Cleaning up spark output directories...")
  start = datetime.now()
  # Helper to recursively collect information from all partitions and flatten it into a single list
  def traverse_partitions(root, partitions, fn_collect_info, currentPartition = None):
    results = [fn_collect_info(root, currentPartition)]
    return results if len(partitions) == 0 else results + \
      [result for subdir in [traverse_partitions(os.path.join(root, folder), partitions[1:], fn_collect_info, partitions[0]) for folder in list_subfolders(root)] for result in subdir]
  # Get the path of files to rename or delete. Note: We must convert to OS paths because we cannot parallelize use of dbutils
  def find_files_to_rename_and_delete(folder, partition):
    files = [x.name for x in dbutils.fs.ls(folder)]
    renames = [x for x in files if x[0:5] == "part-"]
    deletes = [f"/dbfs{folder}/{x}" for x in files if x[0:1] == "_"]
    if len(renames) > 0 and partition is None: raise Exception(f"Found {len(files)} partition file(s) in the root location: {folder}. Have files already been moved?")
    elif len(renames) > 1: raise Exception(f"Expected at most one partition file, but found {len(files)} in location: {folder}")
    elif len(renames) == 1: deletes.append(f"/dbfs{folder}/") # The leaf-folders (containing partitions) should be deleted after the file is moved
    return (deletes, None if len(renames) == 0 else (f"/dbfs{folder}/{renames[0]}", f"/dbfs{folder.replace(partition + '=', '')}{output_extension}"))
  # Scan the file system to find all files and folders that need to be moved and deleted
  if VERBOSE: log(f"Collecting a list of files that need to be renamed and deleted...")
  actions = traverse_partitions(root, partitions, find_files_to_rename_and_delete)
  # Rename all files in parallel using spark executors
  renames = [rename for (deletes, rename) in actions if rename is not None]
  if VERBOSE: log(f"Renaming {len(renames)} partition files...")
  spark.createDataFrame(renames, ['from', 'to']).foreach(lambda r: os.rename(r[0], r[1]))
  # Delete unwanted spark temp files and empty folders
  deletes = [path for (deletes, rename) in actions for path in deletes]
  delete_files = [d for d in deletes if d[-1] != "/"]
  delete_folders = [d for d in deletes if d[-1] == "/"]
  if VERBOSE: log(f"Deleting {len(delete_files)} spark outputs...")
  spark.createDataFrame(delete_files, StringType()).foreach(lambda r: os.remove(r[0]))
  if VERBOSE: log(f"Deleting {len(delete_folders)} empty folders...")
  spark.createDataFrame(delete_folders, StringType()).foreach(lambda r: os.rmdir(r[0]))
  log(f"Moving output files to their destination and cleaning spark artifacts took {(datetime.now() - start).total_seconds():,.2f} seconds")

这使您可以生成分区数据,名称为 user-friendly,并清除在此过程中生成的所有 spark 临时文件(_started...、_committed...、_SUCCESS)。

用法:

# Organize the data into a folders matching the specified partitions, with a single CSV per partition
def dataframe_to_csv_gz_per_partition(df, path, partitions, sort_within_partitions, rename_spark_outputs = True, VERBOSE = False):
  start = datetime.now()
  # Write the actual data to disk using spark
  df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
    .write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
  log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
      f"\n  {path}\n  Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
  # Rename outputs and clean up 
  spark_output_to_single_file_per_partition(path, partitions, ".csv.gz", VERBOSE)

为了它的价值,我也尝试使用 Pool 进行并行化,但结果并不理想。我没有尝试导入和使用任何可以执行异步 io 的库,我想这会表现最好。