Databricks pyspark并行解压缩多个文件

Databricks pyspark parallelize unzipping multiple files

我正在尝试在 Databricks 的 pyspark 中并行解压缩存储在 s3 中的文件。在 for 循环中解压是这样的:

file_list = [(file.path, file.name) for file in dbutils.fs.ls(data_path) if os.path.basename(file.path).endswith(".zip")] # data_path is taken as a parameter
file_names = [ff[1] for ff in file_list]
for ff in file_list:
  dbutils.fs.cp(ff[0], "/FileStore/tmp/" + ff[1])
  cmd = 'unzip /dbfs/FileStore/tmp/' + ff[1]
  os.system(cmd)
  dbutils.fs.cp("file:/databricks/driver/" + ff[1], data_path)
  dbutils.fs.rm("file:/databricks/driver/" + ff[1])

我正在尝试并行化解压缩部分。所以在将文件复制到 "/FileStore/tmp/" 后,我是 运行:

unzips = [file[1] for file in file_list]
def f(x):
  os.system('unzip /dbfs/FileStore/tmp/' + x)
sc.parallelize(unzips).foreach(f)

作业运行但文件未解压到任何地方。

只需添加 -d 选项即可将输出放入 DBFS,如下所示:

def f(x):
  os.system('unzip -o -d /dbfs/FileStore/tmp-output/ /dbfs/FileStore/tmp/' + x)

如有必要(例如,存档中没有目录),为每个 运行 添加更多唯一目录,例如,向其附加 x